Select statement with onSend/onReceive/onAwait clauses
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index e011e2b..4db38bb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -93,10 +93,10 @@
 // --------------- implementation ---------------
 
 private open class StandaloneCoroutine(
-    val parentContext: CoroutineContext,
+    override val parentContext: CoroutineContext,
     active: Boolean
-) : AbstractCoroutine<Unit>(parentContext, active) {
-    override fun afterCompletion(state: Any?) {
+) : AbstractCoroutine<Unit>(active) {
+    override fun afterCompletion(state: Any?, mode: Int) {
         // note the use of the parent's job context below!
         if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
     }
@@ -119,13 +119,13 @@
 }
 
 private class BlockingCoroutine<T>(
-    context: CoroutineContext,
+    override val parentContext: CoroutineContext,
     val blockedThread: Thread,
     val hasPrivateEventLoop: Boolean
-) : AbstractCoroutine<T>(context, active = true) {
-    val eventLoop: EventLoop? = context[ContinuationInterceptor] as? EventLoop
+) : AbstractCoroutine<T>(active = true) {
+    val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
 
-    override fun afterCompletion(state: Any?) {
+    override fun afterCompletion(state: Any?, mode: Int) {
         if (Thread.currentThread() != blockedThread)
             LockSupport.unpark(blockedThread)
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 3877fc8..3663878 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -19,7 +19,7 @@
 import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
 import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.ContinuationInterceptor
+import kotlin.coroutines.experimental.CoroutineContext
 import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
 import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
 import kotlin.coroutines.experimental.suspendCoroutine
@@ -57,18 +57,27 @@
      * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
      * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
      * [completeResume] must be invoked with it.
+     *
+     * When [idempotent] is not `null`, this function performs _idempotent_ operation, so that
+     * further invocations with the same non-null reference produce the same result.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
      */
-    public fun tryResume(value: T): Any?
+    public fun tryResume(value: T, idempotent: Any? = null): Any?
 
     /**
      * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
      * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
      * [completeResume] must be invoked with it.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
      */
     public fun tryResumeWithException(exception: Throwable): Any?
 
     /**
      * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
+     *
+     * @suppress **This is unstable API and it is subject to change.**
      */
     public fun completeResume(token: Any)
 
@@ -113,7 +122,7 @@
     crossinline block: (CancellableContinuation<T>) -> Unit
 ): T =
     suspendCoroutineOrReturn { cont ->
-        val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
+        val cancellable = CancellableContinuationImpl(cont, active = true)
         if (!holdCancellability) cancellable.initCancellability()
         block(cancellable)
         cancellable.getResult()
@@ -141,57 +150,66 @@
 }
 
 @PublishedApi
-internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
-    val job = cont.context[Job]
-    // fast path when parent job is already complete (we don't even construct CancellableContinuationImpl object)
-    if (job != null && !job.isActive) throw job.getCompletionException()
-    return job
-}
-
-@PublishedApi
 internal open class CancellableContinuationImpl<in T>(
-    private val delegate: Continuation<T>,
-    private val parentJob: Job?,
+    @JvmField
+    protected val delegate: Continuation<T>,
     active: Boolean
-) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
+) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
     @Volatile
     private var decision = UNDECIDED
 
-    private companion object {
+    override val parentContext: CoroutineContext
+        get() = delegate.context
+
+    protected companion object {
+        @JvmStatic
         val DECISION: AtomicIntegerFieldUpdater<CancellableContinuationImpl<*>> =
                 AtomicIntegerFieldUpdater.newUpdater(CancellableContinuationImpl::class.java, "decision")
 
         const val UNDECIDED = 0
         const val SUSPENDED = 1
         const val RESUMED = 2
-        const val YIELD = 3 // used by cancellable "yield"
-        const val UNDISPATCHED = 4 // used by "undispatchedXXX"
+
+        const val MODE_UNDISPATCHED = 1
+        const val MODE_DIRECT = 2
+
+        @Suppress("UNCHECKED_CAST")
+        fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
     }
 
     override fun initCancellability() {
-        initParentJob(parentJob)
+        initParentJob(delegate.context[Job])
     }
 
     @PublishedApi
     internal fun getResult(): Any? {
         val decision = this.decision // volatile read
-        when (decision) {
-            UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
-            YIELD -> return COROUTINE_SUSPENDED
-        }
+        if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
         // otherwise, afterCompletion was already invoked, and the result is in the state
         val state = this.state
         if (state is CompletedExceptionally) throw state.exception
-        return state
+        return getSuccessfulResult(state)
     }
 
     override val isCancelled: Boolean get() = state is Cancelled
 
-    override fun tryResume(value: T): Any? {
+    override fun tryResume(value: T, idempotent: Any?): Any? {
         while (true) { // lock-free loop on state
             val state = this.state // atomic read
             when (state) {
-                is Incomplete -> if (tryUpdateState(state, value)) return state
+                is Incomplete -> {
+                    val idempotentStart = state.idempotentStart
+                    val update: Any? = if (idempotent == null && idempotentStart == null) value else
+                        CompletedIdempotentResult(idempotentStart, idempotent, value, state)
+                    if (tryUpdateState(state, update)) return state
+                }
+                is CompletedIdempotentResult -> {
+                    if (state.idempotentResume === idempotent) {
+                        check(state.result === value) { "Non-idempotent resume" }
+                        return state.token
+                    } else
+                        return null
+                }
                 else -> return null // cannot resume -- not active anymore
             }
         }
@@ -201,56 +219,69 @@
         while (true) { // lock-free loop on state
             val state = this.state // atomic read
             when (state) {
-                is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
+                is Incomplete -> {
+                    if (tryUpdateState(state, CompletedExceptionally(state.idempotentStart, exception))) return state
+                }
                 else -> return null // cannot resume -- not active anymore
             }
         }
     }
 
     override fun completeResume(token: Any) {
-        completeUpdateState(token, state)
+        completeUpdateState(token, state, mode = 0)
     }
 
-    @Suppress("UNCHECKED_CAST")
-    override fun afterCompletion(state: Any?) {
+    override fun afterCompletion(state: Any?, mode: Int) {
         val decision = this.decision // volatile read
         if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
         // otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
-        when {
-            decision == UNDISPATCHED -> undispatchedCompletion(state)
-            state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
-            decision == YIELD && delegate is DispatchedContinuation -> delegate.resumeYield(parentJob, state as T)
-            else -> delegate.resume(state as T)
+        if (state is CompletedExceptionally) {
+            val exception = state.exception
+            when (mode) {
+                0 -> delegate.resumeWithException(exception)
+                MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
+                MODE_DIRECT -> {
+                    if (delegate is DispatchedContinuation)
+                        delegate.continuation.resumeWithException(exception)
+                    else
+                        delegate.resumeWithException(exception)
+                }
+                else -> error("Invalid mode $mode")
+            }
+        } else {
+            val value = getSuccessfulResult<T>(state)
+            when (mode) {
+                0 -> delegate.resume(value)
+                MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
+                MODE_DIRECT -> {
+                    if (delegate is DispatchedContinuation)
+                        delegate.continuation.resume(value)
+                    else
+                        delegate.resume(value)
+                }
+                else -> error("Invalid mode $mode")
+            }
         }
     }
 
-    @Suppress("UNCHECKED_CAST")
-    private fun undispatchedCompletion(state: Any?) {
-        delegate as DispatchedContinuation // type assertion -- was checked in resumeUndispatched
-        if (state is CompletedExceptionally)
-            delegate.resumeUndispatchedWithException(state.exception)
-        else
-            delegate.resumeUndispatched(state as T)
-    }
-
-    // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
-    fun resumeYield(value: T) {
-        if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
-            DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
-        resume(value)
-    }
-
     override fun CoroutineDispatcher.resumeUndispatched(value: T) {
         val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
         check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
-        DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
-        resume(value)
+        resume(value, MODE_UNDISPATCHED)
     }
 
     override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
         val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
         check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
-        DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
-        resumeWithException(exception)
+        resumeWithException(exception, MODE_UNDISPATCHED)
+    }
+
+    private class CompletedIdempotentResult(
+        idempotentStart: Any?,
+        @JvmField val idempotentResume: Any?,
+        @JvmField val result: Any?,
+        @JvmField val token: Incomplete
+    ) : CompletedIdempotentStart(idempotentStart) {
+        override fun toString(): String = "CompletedIdempotentResult[$result]"
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index b581336..ac7e1c4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -94,8 +94,8 @@
 }
 
 internal class DispatchedContinuation<in T>(
-        val dispatcher: CoroutineDispatcher,
-        val continuation: Continuation<T>
+    @JvmField val dispatcher: CoroutineDispatcher,
+    @JvmField val continuation: Continuation<T>
 ): Continuation<T> by continuation {
     override fun resume(value: T) {
         val context = continuation.context
@@ -132,20 +132,15 @@
     }
 
     // used by "yield" implementation
-    fun resumeYield(job: Job?, value: T) {
+    internal fun dispatchYield(job: Job?, value: T) {
         val context = continuation.context
-        if (dispatcher.isDispatchNeeded(context))
-            dispatcher.dispatch(context, Runnable {
-                withCoroutineContext(context) {
-                    if (job?.isCompleted == true)
-                        continuation.resumeWithException(job.getCompletionException())
-                    else
-                        continuation.resume(value)
-                }
-            })
-        else
+        dispatcher.dispatch(context, Runnable {
             withCoroutineContext(context) {
-                continuation.resume(value)
+                if (job != null && job.isCompleted)
+                    continuation.resumeWithException(job.getCompletionException())
+                else
+                    continuation.resume(value)
             }
+        })
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index 0ef0ffa..4630cc5 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -49,33 +49,48 @@
  * implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
  * It stores the result of continuation in the state of the job.
  *
- * @param context the new context for the coroutine. Use [newCoroutineContext] to create it.
  * @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
  * @suppress **This is unstable API and it is subject to change.**
  */
 public abstract class AbstractCoroutine<in T>(
-    context: CoroutineContext,
     active: Boolean
 ) : JobSupport(active), Continuation<T>, CoroutineScope {
-    @Suppress("LeakingThis")
-    override val context: CoroutineContext = context + this // merges this job into this context
+    // context must be Ok for unsafe publishing (it is persistent),
+    // so we don't mark this _context variable as volatile, but leave potential benign race here
+    private var _context: CoroutineContext? = null // created on first need
 
-    final override fun resume(value: T) {
+    @Suppress("LeakingThis")
+    public final override val context: CoroutineContext
+        get() = _context ?: createContext().also { _context = it }
+
+    protected abstract val parentContext: CoroutineContext
+
+    protected open fun createContext() = parentContext + this
+
+    protected open fun defaultResumeMode(): Int = 0
+
+    final override fun resume(value: T) = resume(value, defaultResumeMode())
+
+    protected fun resume(value: T, mode: Int) {
         while (true) { // lock-free loop on state
             val state = this.state // atomic read
             when (state) {
-                is Incomplete -> if (updateState(state, value)) return
+                is Incomplete -> if (updateState(state, value, mode)) return
                 is Cancelled -> return // ignore resumes on cancelled continuation
                 else -> throw IllegalStateException("Already resumed, but got value $value")
             }
         }
     }
 
-    final override fun resumeWithException(exception: Throwable) {
+    final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode())
+
+    protected fun resumeWithException(exception: Throwable, mode: Int) {
         while (true) { // lock-free loop on state
             val state = this.state // atomic read
             when (state) {
-                is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
+                is Incomplete -> {
+                    if (updateState(state, CompletedExceptionally(state.idempotentStart, exception), mode)) return
+                }
                 is Cancelled -> {
                     // ignore resumes on cancelled continuation, but handle exception if a different one is here
                     if (exception != state.exception) handleCoroutineException(context, exception)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 38356b6..9c75420 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -16,6 +16,9 @@
 
 package kotlinx.coroutines.experimental
 
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectInstance
 import kotlin.coroutines.experimental.CoroutineContext
 import kotlin.coroutines.experimental.startCoroutine
 
@@ -67,6 +70,12 @@
     public suspend fun await(): T
 
     /**
+     * Registers [onAwait][SelectBuilder.onAwait] select clause.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
+
+    /**
      * Returns *completed* result or throws [IllegalStateException] if this deferred value has not
      * [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
      * [completed exceptionally][isCompletedExceptionally].
@@ -118,9 +127,9 @@
     async(context, block = block)
 
 private open class DeferredCoroutine<T>(
-    context: CoroutineContext,
+    override val parentContext: CoroutineContext,
     active: Boolean
-) : AbstractCoroutine<T>(context, active), Deferred<T> {
+) : AbstractCoroutine<T>(active), Deferred<T> {
     override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
     override val isCancelled: Boolean get() = state is Cancelled
 
@@ -152,6 +161,25 @@
         })
     }
 
+    override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) {
+        if (select.isSelected) return
+        val state = this.state
+        if (state is Incomplete) {
+            select.unregisterOnCompletion(invokeOnCompletion(SelectOnCompletion(this, select, block)))
+        } else
+            selectCompletion(select, block, state)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    internal fun <R> selectCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
+        if (select.trySelect(idempotent = null)) {
+            if (state is CompletedExceptionally)
+                select.resumeSelectWithException(state.exception)
+            else
+                block.startUndispatchedCoroutine(state as T, select.completion)
+        }
+    }
+
     @Suppress("UNCHECKED_CAST")
     override fun getCompleted(): T {
         val state = this.state
@@ -161,10 +189,19 @@
     }
 }
 
+private class SelectOnCompletion<T, R>(
+    deferred: DeferredCoroutine<T>,
+    private val select: SelectInstance<R>,
+    private val block: suspend (T) -> R
+) : JobNode<DeferredCoroutine<T>>(deferred) {
+    override fun invoke(reason: Throwable?) = job.selectCompletion(select, block)
+    override fun toString(): String = "SelectOnCompletion[$select]"
+}
+
 private class LazyDeferredCoroutine<T>(
-        context: CoroutineContext,
-        val block: suspend CoroutineScope.() -> T
-) : DeferredCoroutine<T>(context, active = false) {
+    parentContext: CoroutineContext,
+    private val block: suspend CoroutineScope.() -> T
+) : DeferredCoroutine<T>(parentContext, active = false) {
     override fun onStart() {
         block.startCoroutine(this, this)
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index abd34bb..8c93e97 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -290,12 +290,16 @@
             return
         }
         // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
-        val newRegistration = parent.invokeOnCompletion(CancelOnCompletion(parent, this))
+        val newRegistration = parent.invokeOnCompletion(ParentOnCompletion(parent, this))
         registration = newRegistration
         // now check our state _after_ registering (see updateState order of actions)
         if (isCompleted) newRegistration.unregister()
     }
 
+    internal open fun onParentCompletion(cause: Throwable?) {
+        cancel()
+    }
+
     /**
      * Returns current state of this job.
      */
@@ -310,9 +314,9 @@
     /**
      * Tries to update current [state] of this job.
      */
-    internal fun updateState(expect: Any, update: Any?): Boolean {
+    internal fun updateState(expect: Any, update: Any?, mode: Int): Boolean {
         if (!tryUpdateState(expect, update)) return false
-        completeUpdateState(expect, update)
+        completeUpdateState(expect, update, mode)
         return true
     }
 
@@ -324,7 +328,7 @@
         return true // continues in completeUpdateState
     }
 
-    internal fun completeUpdateState(expect: Any, update: Any?) {
+    internal fun completeUpdateState(expect: Any, update: Any?, mode: Int) {
         // Invoke completion handlers
         val cause = (update as? CompletedExceptionally)?.cause
         var completionException: Throwable? = null
@@ -350,7 +354,7 @@
         // handle invokeOnCompletion exceptions
         completionException?.let { handleCompletionException(it) }
         // Do other (overridable) processing after completion handlers
-        afterCompletion(update)
+        afterCompletion(update, mode)
     }
 
     public final override val isActive: Boolean get() {
@@ -378,63 +382,110 @@
     // return: 0 -> false (not new), 1 -> true (started), -1 -> retry
     internal fun startInternal(state: Any?): Int {
         when {
-            // EMPTY_NEW state -- no completion handlers, new
-            state === EmptyNew -> {
+            state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
                 if (!STATE.compareAndSet(this, state, EmptyActive)) return -1
                 onStart()
                 return 1
             }
-            // LIST -- a list of completion handlers (either new or active)
-            state is NodeList -> {
+            state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
                 if (state.isActive) return 0
                 if (!NodeList.ACTIVE.compareAndSet(state, null, NodeList.ACTIVE_STATE)) return -1
                 onStart()
                 return 1
             }
-            // not a new state
-            else -> return 0
+            else -> return 0 // not a new state
         }
     }
 
-    internal fun describeStart(failureMarker: Any): AtomicDesc =
-        object : AtomicDesc() {
-            override fun prepare(op: AtomicOp): Any? {
-                while (true) { // lock-free loop on state
-                    val state = this@JobSupport._state
-                    when {
-                        state === op -> return null // already in progress
-                        state is OpDescriptor -> state.perform(this@JobSupport) // help
-                        state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
-                            if (STATE.compareAndSet(this@JobSupport, state, op)) return null // success
+    // it is just like start(), but support idempotent start
+    public fun trySelect(idempotent: Any?): Boolean {
+        if (idempotent == null) return start() // non idempotent -- use plain start
+        check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
+        while (true) { // lock-free loop on state
+            val state = this.state
+            when {
+                state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+                    // try to promote it to list in new state
+                    STATE.compareAndSet(this, state, NodeList(active = false))
+                }
+                state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+                    val active = state.active
+                    if (active === idempotent) return true // was activated with the same marker --> true
+                    if (active != null) return false
+                    if (NodeList.ACTIVE.compareAndSet(state, null, idempotent)) {
+                        onStart()
+                        return true
+                    }
+                }
+                state is CompletedIdempotentStart -> { // remembers idempotent start token
+                    return state.idempotentStart === idempotent
+                }
+                else -> return false
+            }
+        }
+    }
+
+    public fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
+    public fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
+
+    private inner class AtomicSelectOp(
+        @JvmField val desc: AtomicDesc,
+        @JvmField val activate: Boolean
+    ) : AtomicOp () {
+        override fun prepare(): Any? = prepareIfNotSelected() ?: desc.prepare(this)
+
+        override fun complete(affected: Any?, failure: Any?) {
+            completeSelect(failure)
+            desc.complete(this, failure)
+        }
+
+        fun prepareIfNotSelected(): Any? {
+            while (true) { // lock-free loop on state
+                val state = _state
+                when {
+                    state === this@AtomicSelectOp -> return null // already in progress
+                    state is OpDescriptor -> state.perform(this@JobSupport) // help
+                    state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+                        if (STATE.compareAndSet(this@JobSupport, state, this@AtomicSelectOp)) return null // success
+                    }
+                    state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+                        val active = state._active
+                        when {
+                            active == null -> {
+                                if (NodeList.ACTIVE.compareAndSet(state, null, this@AtomicSelectOp)) return null // success
+                            }
+                            active === this@AtomicSelectOp -> return null // already in progress
+                            active is OpDescriptor -> active.perform(state) // help
+                            else -> return ALREADY_SELECTED // active state
                         }
-                        state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
-                            if (state.isActive) return failureMarker
-                            if (NodeList.ACTIVE.compareAndSet(state, null, op)) return null // success
+                    }
+                    else -> return ALREADY_SELECTED // not a new state
+                }
+            }
+        }
+
+        private fun completeSelect(failure: Any?) {
+            val success = failure == null
+            val state = _state
+            when {
+                state === this -> {
+                    val update = if (success && activate) EmptyActive else EmptyNew
+                    if (STATE.compareAndSet(this@JobSupport, this, update)) {
+                        if (success) onStart()
+                    }
+                }
+                state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+                    if (state._active === this) {
+                        val update = if (success && activate) NodeList.ACTIVE_STATE else null
+                        if (NodeList.ACTIVE.compareAndSet(state, this, update)) {
+                            if (success) onStart()
                         }
-                        else -> return failureMarker // not a new state
                     }
                 }
             }
 
-            override fun complete(op: AtomicOp, failure: Any?) {
-                val success = failure == null
-                val state = this@JobSupport._state
-                when {
-                    state === op -> {
-                        if (STATE.compareAndSet(this@JobSupport, op, if (success) EmptyActive else EmptyNew)) {
-                            if (success) onStart()
-                        }
-                    }
-                    state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
-                        if (state._active === op) {
-                            if (NodeList.ACTIVE.compareAndSet(state, op, if (success) NodeList.ACTIVE_STATE else null)) {
-                                if (success) onStart()
-                            }
-                        }
-                    }
-                }
-            }
         }
+    }
 
     /**
      * Override to provide the actual [start] action.
@@ -457,19 +508,16 @@
         while (true) { // lock-free loop on state
             val state = this.state
             when {
-                // EMPTY_ACTIVE state -- no completion handlers, active
-                state === EmptyActive -> {
+                state === EmptyActive -> { // EMPTY_ACTIVE state -- no completion handlers, active
                     // try move to SINGLE state
                     val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
                     if (STATE.compareAndSet(this, state, node)) return node
                 }
-                // EMPTY_NEW state -- no completion handlers, new
-                state === EmptyNew -> {
+                state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
                     // try to promote it to list in new state
                     STATE.compareAndSet(this, state, NodeList(active = false))
                 }
-                // SINGLE/SINGLE+ state -- one completion handler
-                state is JobNode<*> -> {
+                state is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
                     // try to promote it to list (SINGLE+ state)
                     state.addOneIfEmpty(NodeList(active = true))
                     // it must be in SINGLE+ state or state has changed (node could have need removed from state)
@@ -477,13 +525,11 @@
                     // just attempt converting it to list if state is still the same, then continue lock-free loop
                     STATE.compareAndSet(this, state, list)
                 }
-                // LIST -- a list of completion handlers (either new or active)
-                state is NodeList -> {
+                state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
                     val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
                     if (state.addLastIf(node) { this.state === state }) return node
                 }
-                // is inactive
-                else -> {
+                else -> { // is inactive
                     handler((state as? CompletedExceptionally)?.exception)
                     return EmptyRegistration
                 }
@@ -529,7 +575,7 @@
     final override fun cancel(cause: Throwable?): Boolean {
         while (true) { // lock-free loop on state
             val state = this.state as? Incomplete ?: return false // quit if already complete
-            if (updateState(state, Cancelled(cause))) return true
+            if (updateState(state, Cancelled(state.idempotentStart, cause), mode = 0)) return true
         }
     }
 
@@ -543,7 +589,7 @@
     /**
      * Override for post-completion actions that need to do something with the state.
      */
-    protected open fun afterCompletion(state: Any?) {}
+    protected open fun afterCompletion(state: Any?, mode: Int) {}
 
     private fun makeNode(handler: CompletionHandler): JobNode<*> =
             (handler as? JobNode<*>)?.also { require(it.job === this) }
@@ -557,22 +603,31 @@
      */
     public interface Incomplete {
         val isActive: Boolean
+        val idempotentStart: Any? // != null if this state is a descendant of trySelect(idempotent)
     }
 
     private class NodeList(
         active: Boolean
     ) : LockFreeLinkedListHead(), Incomplete {
         @Volatile
+        @JvmField
         var _active: Any? = if (active) ACTIVE_STATE else null
 
-        override val isActive: Boolean get() {
+        val active: Any? get() {
             while (true) { // helper loop for atomic ops
                 val active = this._active
-                if (active !is OpDescriptor) return active != null
+                if (active !is OpDescriptor) return active
                 active.perform(this)
             }
         }
 
+        override val isActive: Boolean get() = active != null
+
+        override val idempotentStart: Any? get() {
+            val active = this.active
+            return if (active === ACTIVE_STATE) null else active
+        }
+
         companion object {
             @JvmStatic
             val ACTIVE: AtomicReferenceFieldUpdater<NodeList, Any?> =
@@ -595,13 +650,20 @@
         }
     }
 
+    public open class CompletedIdempotentStart(
+        @JvmField val idempotentStart: Any?
+    )
+
     /**
      * Class for a [state] of a job that had completed exceptionally, including cancellation.
      *
      * @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
      *        if created on first get from [exception] property.
      */
-    public open class CompletedExceptionally(val cause: Throwable?) {
+    public open class CompletedExceptionally(
+        idempotentStart: Any?,
+        @JvmField val cause: Throwable?
+    ) : CompletedIdempotentStart(idempotentStart) {
         @Volatile
         private var _exception: Throwable? = cause // materialize CancellationException on first need
 
@@ -618,20 +680,27 @@
     /**
      * A specific subclass of [CompletedExceptionally] for cancelled jobs.
      */
-    public class Cancelled(cause: Throwable?) : CompletedExceptionally(cause)
+    public class Cancelled(
+        idempotentStart: Any?,
+        cause: Throwable?
+    ) : CompletedExceptionally(idempotentStart, cause)
 }
 
+internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
+
 private val EmptyNew = Empty(false)
 private val EmptyActive = Empty(true)
 
 private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
+    override val idempotentStart: Any? get() = null
     override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
 }
 
 internal abstract class JobNode<out J : Job>(
-    val job: J
+    @JvmField val job: J
 ) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
     final override val isActive: Boolean get() = true
+    final override val idempotentStart: Any? get() = null
     // if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
     // directly without wrapping
     final override fun unregister() = (job as JobSupport).removeNode(this)
@@ -640,7 +709,7 @@
 
 private class InvokeOnCompletion(
     job: Job,
-    val handler: CompletionHandler
+    @JvmField val handler: CompletionHandler
 ) : JobNode<Job>(job)  {
     override fun invoke(reason: Throwable?) = handler.invoke(reason)
     override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
@@ -648,15 +717,15 @@
 
 private class ResumeOnCompletion(
     job: Job,
-    val continuation: Continuation<Unit>
+    @JvmField val continuation: Continuation<Unit>
 ) : JobNode<Job>(job)  {
     override fun invoke(reason: Throwable?) = continuation.resume(Unit)
     override fun toString() = "ResumeOnCompletion[$continuation]"
 }
 
-private class UnregisterOnCompletion(
+internal class UnregisterOnCompletion(
     job: Job,
-    val registration: Job.Registration
+    @JvmField val registration: Job.Registration
 ) : JobNode<Job>(job) {
     override fun invoke(reason: Throwable?) = registration.unregister()
     override fun toString(): String = "UnregisterOnCompletion[$registration]"
@@ -664,15 +733,23 @@
 
 private class CancelOnCompletion(
     parentJob: Job,
-    val subordinateJob: Job
+    @JvmField val subordinateJob: Job
 ) : JobNode<Job>(parentJob) {
     override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
     override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
 }
 
+private class ParentOnCompletion(
+    parentJob: Job,
+    @JvmField val subordinateJob: JobSupport
+) : JobNode<Job>(parentJob) {
+    override fun invoke(reason: Throwable?) { subordinateJob.onParentCompletion(reason) }
+    override fun toString(): String = "ParentOnCompletion[$subordinateJob]"
+}
+
 private class CancelFutureOnCompletion(
     job: Job,
-    val future: Future<*>
+    @JvmField val future: Future<*>
 ) : JobNode<Job>(job)  {
     override fun invoke(reason: Throwable?) {
         // Don't interrupt when cancelling future on completion, because no one is going to reset this
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
index f9485b0..014f555 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -16,6 +16,9 @@
 
 package kotlinx.coroutines.experimental
 
+import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+
 /**
  * Yields a thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
  * If the coroutine dispatcher does not have its own thread pool (like [Unconfined] dispatcher) then this
@@ -24,6 +27,12 @@
  * If the [Job] of the current coroutine is completed when this suspending function is invoked or while
  * this function is waiting for dispatching, it resumes with [CancellationException].
  */
-suspend fun yield(): Unit = suspendCancellableCoroutine sc@ { cont ->
-    (cont as CancellableContinuationImpl).resumeYield(Unit)
+suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
+    val context = cont.context
+    val job = context[Job]
+    if (job != null && job.isCompleted) throw job.getCompletionException()
+    if (cont !is DispatchedContinuation<Unit>) return@sc Unit
+    if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
+    cont.dispatchYield(job, Unit)
+    COROUTINE_SUSPENDED
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index a335f0e..7ae65f4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -16,14 +16,16 @@
 
 package kotlinx.coroutines.experimental.channels
 
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCancel
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlin.coroutines.experimental.startCoroutine
 
 /**
  * Abstract channel. It is a base class for buffered and unbuffered channels.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
  */
 public abstract class AbstractChannel<E> : Channel<E> {
     private val queue = LockFreeLinkedListHead()
@@ -52,11 +54,22 @@
     protected abstract fun offerInternal(element: E): Any
 
     /**
+     * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
+     * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+     */
+    protected abstract fun offerSelectInternal(element: E, select: SelectInstance<*>): Any
+
+    /**
      * Tries to remove element from buffer or from queued sender.
-     * Return type is `E | POLL_EMPTY | Closed`
+     * Return type is `E | POLL_FAILED | Closed`
      */
     protected abstract fun pollInternal(): Any?
 
+    /**
+     * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+     * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+     */
+    protected abstract fun pollSelectInternal(select: SelectInstance<*>): Any?
 
     // ------ state functions for concrete implementations ------
 
@@ -68,7 +81,7 @@
     /**
      * Returns non-null closed token if it is last in the queue.
      */
-    protected val closedForSend: Any? get() = queue.prev as? Closed<*>
+    protected val closedForSend: ReceiveOrClosed<*>? get() = queue.prev as? Closed<*>
 
     // ------ SendChannel ------
 
@@ -86,13 +99,14 @@
         val result = offerInternal(element)
         return when {
             result === OFFER_SUCCESS -> true
+            result === OFFER_FAILED -> false
             result is Closed<*> -> throw result.sendException
-            else -> false
+            else -> error("offerInternal returned $result")
         }
     }
 
     private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
-        val send = SendElement(cont, element)
+        val send = SendElement(element, cont)
         loop@ while (true) {
             if (enqueueSend(send)) {
                 cont.initCancellability() // make it properly cancellable
@@ -106,10 +120,12 @@
                     cont.resume(Unit)
                     return@sc
                 }
+                result === OFFER_FAILED -> continue@loop
                 result is Closed<*> -> {
                     cont.resumeWithException(result.sendException)
                     return@sc
                 }
+                else -> error("offerInternal returned $result")
             }
         }
     }
@@ -147,7 +163,82 @@
      * Retrieves first receiving waiter from the queue or returns closed token.
      */
     protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
-        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
+
+    // ------ registerSelectSend ------
+
+    protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
+
+    protected class TryOfferDesc<E>(
+        @JvmField val element: E,
+        queue: LockFreeLinkedListHead
+    ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
+        @JvmField var resumeToken: Any? = null
+
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
+            if (affected is Closed<*>) return affected
+            return null
+        }
+
+        override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
+            val token = node.tryResumeReceive(element, idempotent = this) ?: return false
+            resumeToken = token
+            return true
+        }
+    }
+
+    private inner class TryEnqueueSendDesc<E, R>(
+        element: E,
+        select: SelectInstance<R>,
+        block: suspend () -> R
+    ) : AddLastDesc(queue, SendSelect(element, select, block)) {
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is ReceiveOrClosed<*>) {
+                return affected as? Closed<*> ?: ENQUEUE_FAILED
+            }
+            return null
+        }
+
+        override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+            if (!isBufferFull) return ENQUEUE_FAILED
+            return super.onPrepare(affected, next)
+        }
+
+        override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+            super.finishOnSuccess(affected, next)
+            // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+            (node as SendSelect<*>).removeOnSelectCompletion()
+        }
+    }
+
+    override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isFull) {
+                val enqueueOp = TryEnqueueSendDesc(element, select, block)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    enqueueResult is Closed<*> -> throw enqueueResult.sendException
+                    else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
+                }
+            } else {
+                val offerResult = offerSelectInternal(element, select)
+                when {
+                    offerResult === ALREADY_SELECTED -> return
+                    offerResult === OFFER_FAILED -> {} // retry
+                    offerResult === OFFER_SUCCESS -> {
+                        block.startUndispatchedCoroutine(select.completion)
+                        return
+                    }
+                    offerResult is Closed<*> -> throw offerResult.sendException
+                    else -> error("offerSelectInternal returned $offerResult")
+                }
+            }
+        }
+    }
 
     // ------ ReceiveChannel ------
 
@@ -158,7 +249,7 @@
     public final override suspend fun receive(): E {
         // fast path -- try poll non-blocking
         val result = pollInternal()
-        if (result !== POLL_EMPTY) return receiveResult(result)
+        if (result !== POLL_FAILED) return receiveResult(result)
         // slow-path does suspend
         return receiveSuspend()
     }
@@ -171,7 +262,7 @@
 
     @Suppress("UNCHECKED_CAST")
     private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
-        val receive = ReceiveNonNull(cont)
+        val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
         while (true) {
             if (enqueueReceive(receive)) {
                 cont.initCancellability() // make it properly cancellable
@@ -184,7 +275,7 @@
                 cont.resumeWithException(result.receiveException)
                 return@sc
             }
-            if (result !== POLL_EMPTY) {
+            if (result !== POLL_FAILED) {
                 cont.resume(result as E)
                 return@sc
             }
@@ -203,7 +294,7 @@
     public final override suspend fun receiveOrNull(): E? {
         // fast path -- try poll non-blocking
         val result = pollInternal()
-        if (result !== POLL_EMPTY) return receiveOrNullResult(result)
+        if (result !== POLL_FAILED) return receiveOrNullResult(result)
         // slow-path does suspend
         return receiveOrNullSuspend()
     }
@@ -211,7 +302,7 @@
     @Suppress("UNCHECKED_CAST")
     private fun receiveOrNullResult(result: Any?): E? {
         if (result is Closed<*>) {
-            if (result.closeCause != null) throw result.receiveException
+            if (result.closeCause != null) throw result.closeCause
             return null
         }
         return result as E
@@ -219,7 +310,7 @@
 
     @Suppress("UNCHECKED_CAST")
     private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
-        val receive = ReceiveOrNull(cont)
+        val receive = ReceiveElement(cont, nullOnClose = true)
         while (true) {
             if (enqueueReceive(receive)) {
                 cont.initCancellability() // make it properly cancellable
@@ -232,10 +323,10 @@
                 if (result.closeCause == null)
                     cont.resume(null)
                 else
-                    cont.resumeWithException(result.receiveException)
+                    cont.resumeWithException(result.closeCause)
                 return@sc
             }
-            if (result !== POLL_EMPTY) {
+            if (result !== POLL_FAILED) {
                 cont.resume(result as E)
                 return@sc
             }
@@ -245,7 +336,7 @@
     @Suppress("UNCHECKED_CAST")
     public final override fun poll(): E? {
         val result = pollInternal()
-        return if (result === POLL_EMPTY) null else receiveOrNullResult(result)
+        return if (result === POLL_FAILED) null else receiveOrNullResult(result)
     }
 
     public final override fun iterator(): ChannelIterator<E> = Iterator(this)
@@ -256,27 +347,139 @@
     protected fun takeFirstSendOrPeekClosed(): Send? =
         queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
 
-    protected companion object {
-        const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+    // ------ registerSelectReceive ------
 
-        val OFFER_SUCCESS: Any = Marker("OFFER_SUCCESS")
-        val OFFER_FAILED: Any = Marker("OFFER_FAILED")
+    protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
 
-        val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+    protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
+        @JvmField var resumeToken: Any? = null
+        @JvmField var pollResult: E? = null
 
-        fun isClosed(result: Any?): Boolean = result is Closed<*>
-    }
-
-    // for debugging
-    private class Marker(val string: String) {
-        override fun toString(): String = string
-    }
-
-    private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
-        cont.invokeOnCompletion {
-            if (cont.isCancelled && receive.remove())
-                onCancelledReceive()
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is Closed<*>) return affected
+            if (affected !is Send) return POLL_FAILED
+            return null
         }
+
+        @Suppress("UNCHECKED_CAST")
+        override fun validatePrepared(node: Send): Boolean {
+            val token = node.tryResumeSend(this) ?: return false
+            resumeToken = token
+            pollResult = node.pollResult as E
+            return true
+        }
+    }
+
+    private inner class TryEnqueueReceiveDesc<E, R>(
+        select: SelectInstance<R>,
+        block: suspend (E?) -> R,
+        nullOnClose: Boolean
+    ) : AddLastDesc(queue, ReceiveSelect(select, block, nullOnClose)) {
+        override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+            if (affected is Send) return ENQUEUE_FAILED
+            return null
+        }
+
+        override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+            if (!isBufferEmpty) return ENQUEUE_FAILED
+            return super.onPrepare(affected, next)
+        }
+
+        override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+            super.finishOnSuccess(affected, next)
+            // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+            (node as ReceiveSelect<*, *>).removeOnSelectCompletion()
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isEmpty) {
+                val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+                }
+            } else {
+                val pollResult = pollSelectInternal(select)
+                when {
+                    pollResult === ALREADY_SELECTED -> return
+                    pollResult === POLL_FAILED -> {} // retry
+                    pollResult is Closed<*> -> throw pollResult.receiveException
+                    else -> {
+                        block.startUndispatchedCoroutine(pollResult as E, select.completion)
+                        return
+                    }
+                }
+            }
+        }
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+        while (true) {
+            if (select.isSelected) return
+            if (isEmpty) {
+                val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
+                val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+                when {
+                    enqueueResult === ALREADY_SELECTED -> return
+                    enqueueResult === ENQUEUE_FAILED -> {} // retry
+                    else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+                }
+            } else {
+                val pollResult = pollSelectInternal(select)
+                when {
+                    pollResult === ALREADY_SELECTED -> return
+                    pollResult === POLL_FAILED -> {} // retry
+                    pollResult is Closed<*> -> {
+                        if (pollResult.closeCause == null) {
+                            if (select.trySelect(idempotent = null))
+                                block.startUndispatchedCoroutine(null, select.completion)
+                            return
+                        } else
+                            throw pollResult.closeCause
+                    }
+                    else -> {
+                        // selected successfully
+                        block.startUndispatchedCoroutine(pollResult as E, select.completion)
+                        return
+                    }
+                }
+            }
+        }
+    }
+
+    // ------ protected ------
+
+    protected companion object {
+        private const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+        @JvmStatic
+        val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+        @JvmStatic
+        val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+        @JvmStatic
+        val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+        @JvmStatic
+        val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+        @JvmStatic
+        private val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+        @JvmStatic
+        private val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+        @JvmStatic
+        private val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+        @JvmStatic
+        fun isClosed(result: Any?): Boolean = result is Closed<*>
     }
 
     /**
@@ -289,15 +492,24 @@
      */
     protected open fun onCancelledReceive() {}
 
+    // ------ private ------
+
+    private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
+        cont.invokeOnCompletion {
+            if (cont.isCancelled && receive.remove())
+                onCancelledReceive()
+        }
+    }
+
     private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
-        var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+        var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
 
         suspend override fun hasNext(): Boolean {
             // check for repeated hasNext
-            if (result !== POLL_EMPTY) return hasNextResult(result)
+            if (result !== POLL_FAILED) return hasNextResult(result)
             // fast path -- try poll non-blocking
             result = channel.pollInternal()
-            if (result !== POLL_EMPTY) return hasNextResult(result)
+            if (result !== POLL_FAILED) return hasNextResult(result)
             // slow-path does suspend
             return hasNextSuspend()
         }
@@ -328,7 +540,7 @@
                         cont.resumeWithException(result.receiveException)
                     return@sc
                 }
-                if (result !== POLL_EMPTY) {
+                if (result !== POLL_FAILED) {
                     cont.resume(true)
                     return@sc
                 }
@@ -339,8 +551,8 @@
         suspend override fun next(): E {
             val result = this.result
             if (result is Closed<*>) throw result.receiveException
-            if (result !== POLL_EMPTY) {
-                this.result = POLL_EMPTY
+            if (result !== POLL_FAILED) {
+                this.result = POLL_FAILED
                 return result as E
             }
             // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
@@ -353,7 +565,7 @@
      */
     protected interface Send {
         val pollResult: Any? // E | Closed
-        fun tryResumeSend(): Any?
+        fun tryResumeSend(idempotent: Any?): Any?
         fun completeResumeSend(token: Any)
     }
 
@@ -362,31 +574,59 @@
      */
     protected interface ReceiveOrClosed<in E> {
         val offerResult: Any // OFFER_SUCCESS | Closed
-        fun tryResumeReceive(value: E): Any?
+        fun tryResumeReceive(value: E, idempotent: Any?): Any?
         fun completeResumeReceive(token: Any)
     }
 
     @Suppress("UNCHECKED_CAST")
     private class SendElement(
-            val cont: CancellableContinuation<Unit>,
-            override val pollResult: Any?
+        override val pollResult: Any?,
+        @JvmField val cont: CancellableContinuation<Unit>
     ) : LockFreeLinkedListNode(), Send {
-        override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+        override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
         override fun completeResumeSend(token: Any) = cont.completeResume(token)
+        override fun toString(): String = "SendElement($pollResult)[$cont]"
     }
 
-    private class Closed<in E>(
-        val closeCause: Throwable?
+    private class SendSelect<R>(
+        override val pollResult: Any?,
+        @JvmField val select: SelectInstance<R>,
+        @JvmField val block: suspend () -> R
+    ) : LockFreeLinkedListNode(), Send, CompletionHandler {
+        override fun tryResumeSend(idempotent: Any?): Any? =
+            if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+        override fun completeResumeSend(token: Any) {
+            check(token === SELECT_STARTED)
+            block.startCoroutine(select.completion)
+        }
+
+        fun removeOnSelectCompletion() {
+            select.invokeOnCompletion(this)
+        }
+
+        override fun invoke(cause: Throwable?) {
+            remove()
+        }
+
+        override fun toString(): String = "SendSelect($pollResult)[$select]"
+    }
+
+    /**
+     * Represents closed channel.
+     */
+    protected class Closed<in E>(
+        @JvmField val closeCause: Throwable?
     ) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
         @Volatile
-        var _sendException: Throwable? = null
+        private var _sendException: Throwable? = null
 
         val sendException: Throwable get() = _sendException ?:
             (closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE))
                     .also { _sendException = it }
 
         @Volatile
-        var _receiveException: Throwable? = null
+        private var _receiveException: Throwable? = null
 
         val receiveException: Throwable get() = _receiveException ?:
             (closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE))
@@ -394,10 +634,11 @@
 
         override val offerResult get() = this
         override val pollResult get() = this
-        override fun tryResumeSend(): Boolean = true
-        override fun completeResumeSend(token: Any) {}
-        override fun tryResumeReceive(value: E): Any? = throw sendException
+        override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+        override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
         override fun completeResumeReceive(token: Any) = throw sendException
+        override fun toString(): String = "Closed[$closeCause]"
     }
 
     private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
@@ -405,34 +646,51 @@
         abstract fun resumeReceiveClosed(closed: Closed<*>)
     }
 
-    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
-        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
-        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
-        override fun resumeReceiveClosed(closed: Closed<*>) = cont.resumeWithException(closed.receiveException)
-    }
-
-    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
-        override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+    private class ReceiveElement<in E>(
+        @JvmField val cont: CancellableContinuation<E?>,
+        @JvmField val nullOnClose: Boolean
+    ) : Receive<E>() {
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
         override fun completeResumeReceive(token: Any) = cont.completeResume(token)
         override fun resumeReceiveClosed(closed: Closed<*>) {
-            if (closed.closeCause == null)
+            if (closed.closeCause == null && nullOnClose)
                 cont.resume(null)
             else
                 cont.resumeWithException(closed.receiveException)
         }
+        override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
     }
 
+    private class IdempotentTokenValue<out E>(
+        @JvmField val token: Any,
+        @JvmField val value: E
+    )
+
     private class ReceiveHasNext<E>(
-            val iterator: Iterator<E>,
-            val cont: CancellableContinuation<Boolean>
+        @JvmField val iterator: Iterator<E>,
+        @JvmField val cont: CancellableContinuation<Boolean>
     ) : Receive<E>() {
-        override fun tryResumeReceive(value: E): Any? {
-            val token = cont.tryResume(true)
-            if (token != null) iterator.result = value
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
+            val token = cont.tryResume(true, idempotent)
+            if (token != null) {
+                /*
+                   When idempotent != null this invocation can be stale and we cannot directly update iterator.result
+                   Instead, we save both token & result into a temporary IdempotentTokenValue object and
+                   set iterator result only in completeResumeReceive that is going to be invoked just once
+                 */
+                if (idempotent != null) return IdempotentTokenValue(token, value)
+                iterator.result = value
+            }
             return token
         }
 
-        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun completeResumeReceive(token: Any) {
+            if (token is IdempotentTokenValue<*>) {
+                iterator.result = token.value
+                cont.completeResume(token.token)
+            } else
+                cont.completeResume(token)
+        }
 
         override fun resumeReceiveClosed(closed: Closed<*>) {
             val token = if (closed.closeCause == null)
@@ -444,5 +702,40 @@
                 cont.completeResume(token)
             }
         }
+        override fun toString(): String = "ReceiveHasNext[$cont]"
+    }
+
+    private class ReceiveSelect<R, in E>(
+        @JvmField val select: SelectInstance<R>,
+        @JvmField val block: suspend (E?) -> R,
+        @JvmField val nullOnClose: Boolean
+    ) : Receive<E>(), CompletionHandler {
+        override fun tryResumeReceive(value: E, idempotent: Any?): Any?  =
+            if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
+
+        @Suppress("UNCHECKED_CAST")
+        override fun completeResumeReceive(token: Any) {
+            val value: E = (if (token === NULL_VALUE) null else token) as E
+            block.startCoroutine(value, select.completion)
+        }
+
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            if (select.trySelect(idempotent = null)) {
+                if (closed.closeCause == null && nullOnClose) {
+                    block.startCoroutine(null, select.completion)
+                } else
+                    select.resumeSelectWithException(closed.receiveException)
+            }
+        }
+
+        fun removeOnSelectCompletion() {
+            select.invokeOnCompletion(this)
+        }
+
+        override fun invoke(cause: Throwable?) {
+            remove()
+        }
+
+        override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index b5265fa..c75f1bb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -16,6 +16,8 @@
 
 package kotlinx.coroutines.experimental.channels
 
+import kotlinx.coroutines.experimental.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
 import java.util.concurrent.locks.ReentrantLock
 
 /**
@@ -53,8 +55,8 @@
 
     // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
     override fun offerInternal(element: E): Any {
-        var token: Any? = null
         var receive: ReceiveOrClosed<E>? = null
+        var token: Any? = null
         locked {
             val size = this.size
             closedForSend?.let { return it }
@@ -63,9 +65,13 @@
                 this.size = size + 1 // update size before checking queue (!!!)
                 // check for receivers that were waiting on empty queue
                 if (size == 0) {
-                    while (true) {
-                        receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
-                        token = receive!!.tryResumeReceive(element)
+                    loop@ while (true) {
+                        receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+                        if (receive is Closed) {
+                            this.size = size // restore size
+                            return receive!!
+                        }
+                        token = receive!!.tryResumeReceive(element, idempotent = null)
                         if (token != null) {
                             this.size = size // restore size
                             return@locked
@@ -83,31 +89,79 @@
         return receive!!.offerResult
     }
 
-    // result is `E | POLL_EMPTY | Closed`
-    override fun pollInternal(): Any? {
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+    override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        var receive: ReceiveOrClosed<E>? = null
         var token: Any? = null
+        locked {
+            val size = this.size
+            closedForSend?.let { return it }
+            if (size < capacity) {
+                // tentatively put element to buffer
+                this.size = size + 1 // update size before checking queue (!!!)
+                // check for receivers that were waiting on empty queue
+                if (size == 0) {
+                    loop@ while (true) {
+                        val offerOp = describeTryOffer(element)
+                        val failure = select.performAtomicTrySelect(offerOp)
+                        when {
+                            failure == null -> { // offered successfully
+                                this.size = size // restore size
+                                receive = offerOp.result
+                                token = offerOp.resumeToken
+                                check(token != null)
+                                return@locked
+                            }
+                            failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+                            failure === ALREADY_SELECTED || failure is Closed<*> -> {
+                                this.size = size // restore size
+                                return failure
+                            }
+                            else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+                        }
+                    }
+                }
+                // let's try to select sending this element to buffer
+                if (!select.trySelect(null)) {
+                    this.size = size // restore size
+                    return ALREADY_SELECTED
+                }
+                buffer[(head + size) % capacity] = element // actually queue element
+                return OFFER_SUCCESS
+            }
+            // size == capacity: full
+            return OFFER_FAILED
+        }
+        // breaks here if offer meets receiver
+        receive!!.completeResumeReceive(token!!)
+        return receive!!.offerResult
+    }
+
+    // result is `E | POLL_FAILED | Closed`
+    override fun pollInternal(): Any? {
         var send: Send? = null
+        var token: Any? = null
         var result: Any? = null
         locked {
             val size = this.size
-            if (size == 0) return closedForSend ?: POLL_EMPTY
+            if (size == 0) return closedForSend ?: POLL_FAILED
             // size > 0: not empty -- retrieve element
             result = buffer[head]
             buffer[head] = null
             this.size = size - 1 // update size before checking queue (!!!)
             // check for senders that were waiting on full queue
-            var replacement: Any? = POLL_EMPTY
+            var replacement: Any? = POLL_FAILED
             if (size == capacity) {
-                while (true) {
+                loop@ while (true) {
                     send = takeFirstSendOrPeekClosed() ?: break
-                    token = send!!.tryResumeSend()
+                    token = send!!.tryResumeSend(idempotent = null)
                     if (token != null) {
                         replacement = send!!.pollResult
-                        break
+                        break@loop
                     }
                 }
             }
-            if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
+            if (replacement !== POLL_FAILED && !isClosed(replacement)) {
                 this.size = size // restore size
                 buffer[(head + size) % capacity] = replacement
             }
@@ -118,4 +172,65 @@
             send!!.completeResumeSend(token!!)
         return result
     }
+
+    // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+    override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+        var send: Send? = null
+        var token: Any? = null
+        var result: Any? = null
+        locked {
+            val size = this.size
+            if (size == 0) return closedForSend ?: POLL_FAILED
+            // size > 0: not empty -- retrieve element
+            result = buffer[head]
+            buffer[head] = null
+            this.size = size - 1 // update size before checking queue (!!!)
+            // check for senders that were waiting on full queue
+            var replacement: Any? = POLL_FAILED
+            if (size == capacity) {
+                loop@ while (true) {
+                    val pollOp = describeTryPoll()
+                    val failure = select.performAtomicTrySelect(pollOp)
+                    when {
+                        failure == null -> { // polled successfully
+                            send = pollOp.result
+                            token = pollOp.resumeToken
+                            check(token != null)
+                            replacement = send!!.pollResult
+                            break@loop
+                        }
+                        failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
+                        failure === ALREADY_SELECTED -> {
+                            this.size = size // restore size
+                            buffer[head] = result // restore head
+                            return failure
+                        }
+                        failure is Closed<*> -> {
+                            send = failure
+                            token = failure.tryResumeSend(idempotent = null)
+                            replacement = failure
+                            break@loop
+                        }
+                        else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+                    }
+                }
+            }
+            if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+                this.size = size // restore size
+                buffer[(head + size) % capacity] = replacement
+            } else {
+                // failed to poll or is already closed --> let's try to select receiving this element from buffer
+                if (!select.trySelect(null)) {
+                    this.size = size // restore size
+                    buffer[head] = result // restore head
+                    return ALREADY_SELECTED
+                }
+            }
+            head = (head + 1) % capacity
+        }
+        // complete send the we're taken replacement from
+        if (token != null)
+            send!!.completeResumeSend(token!!)
+        return result
+    }
 }
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index fc97bb9..f46c218 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -19,6 +19,9 @@
 import kotlinx.coroutines.experimental.CancellationException
 import kotlinx.coroutines.experimental.CoroutineScope
 import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.select
 import kotlinx.coroutines.experimental.yield
 
 /**
@@ -40,7 +43,7 @@
     public val isFull: Boolean
 
     /**
-     * Adds [element] into to this queue, suspending the caller while this queue [isFull],
+     * Adds [element] into to this channel, suspending the caller while this channel [isFull],
      * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
      * It throws the original [close] cause exception if the channel has _failed_.
      *
@@ -51,6 +54,9 @@
      *
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onSend][SelectBuilder.onSend] clause.
+     * Use [offer] to try sending to this channel without waiting.
      */
     public suspend fun send(element: E)
 
@@ -63,6 +69,12 @@
     public fun offer(element: E): Boolean
 
     /**
+     * Registers [onSend][SelectBuilder.onSend] select clause.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R)
+
+    /**
      * Closes this channel with an optional exceptional [cause].
      * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
      * Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
@@ -109,6 +121,9 @@
      *
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onReceive][SelectBuilder.onReceive] clause.
+     * Use [poll] to try receiving from this channel without waiting.
      */
     public suspend fun receive(): E
 
@@ -124,6 +139,9 @@
      *
      * Note, that this function does not check for cancellation when it is not suspended.
      * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * This function can be used in [select] invocation with [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause.
+     * Use [poll] to try receiving from this channel without waiting.
      */
     public suspend fun receiveOrNull(): E?
 
@@ -140,6 +158,18 @@
      * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      */
     public operator fun iterator(): ChannelIterator<E>
+
+    /**
+     * Registers [onReceive][SelectBuilder.onReceive] select clause.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R)
+
+    /**
+     * Registers [onReceiveOrNull][SelectBuilder.onReceiveOrNull] select clause.
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    public fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R)
 }
 
 /**
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index ef7b97d..24c6855 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -102,10 +102,10 @@
     produce(context, capacity, block)
 
 private class ProducerCoroutine<E>(
-    context: CoroutineContext,
+    override val parentContext: CoroutineContext,
     override val channel: Channel<E>
-) : AbstractCoroutine<Unit>(context, active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
-    override fun afterCompletion(state: Any?) {
+) : AbstractCoroutine<Unit>(active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
+    override fun afterCompletion(state: Any?, mode: Int) {
         val cause = (state as? CompletedExceptionally)?.cause
         if (!channel.close(cause) && cause != null)
             handleCoroutineException(context, cause)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index e1a0ebb..f0fe82e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -16,6 +16,8 @@
 
 package kotlinx.coroutines.experimental.channels
 
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
 /**
  * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
  * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
@@ -33,7 +35,7 @@
     protected final override fun offerInternal(element: E): Any {
         while (true) {
             val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
-            val token = receive.tryResumeReceive(element)
+            val token = receive.tryResumeReceive(element, idempotent = null)
             if (token != null) {
                 receive.completeResumeReceive(token)
                 return receive.offerResult
@@ -41,16 +43,38 @@
         }
     }
 
-    // result is `E | POLL_EMPTY | Closed`
+    // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+    protected final override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+        // offer atomically with select
+        val offerOp = describeTryOffer(element)
+        val failure = select.performAtomicTrySelect(offerOp)
+        if (failure != null) return failure
+        val receive = offerOp.result
+        receive.completeResumeReceive(offerOp.resumeToken!!)
+        return receive.offerResult
+    }
+
+    // result is `E | POLL_FAILED | Closed`
     protected final override fun pollInternal(): Any? {
         while (true) {
-            val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
-            val token = send.tryResumeSend()
+            val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+            val token = send.tryResumeSend(idempotent = null)
             if (token != null) {
                 send.completeResumeSend(token)
                 return send.pollResult
             }
         }
     }
+
+    // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+    protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+        // poll atomically with select
+        val pollOp = describeTryPoll()
+        val failure = select.performAtomicTrySelect(pollOp)
+        if (failure != null) return failure
+        val send = pollOp.result
+        send.completeResumeSend(pollOp.resumeToken!!)
+        return pollOp.pollResult
+    }
 }
 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
index 5f5ace0..a258d92 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
@@ -36,25 +36,29 @@
         private val CONSENSUS: AtomicReferenceFieldUpdater<AtomicOp, Any?> =
             AtomicReferenceFieldUpdater.newUpdater(AtomicOp::class.java, Any::class.java, "_consensus")
 
+        @JvmStatic
         private val UNDECIDED: Any = Symbol("UNDECIDED")
     }
 
     val isDecided: Boolean get() = _consensus !== UNDECIDED
 
+    fun tryDecide(decision: Any?): Boolean {
+        check(decision !== UNDECIDED)
+        return CONSENSUS.compareAndSet(this, UNDECIDED, decision)
+    }
+
+    private fun decide(decision: Any?): Any? = if (tryDecide(decision)) decision else _consensus
+
     abstract fun prepare(): Any? // `null` if Ok, or failure reason
+
     abstract fun complete(affected: Any?, failure: Any?) // failure != null if failed to prepare op
 
     // returns `null` on success
     final override fun perform(affected: Any?): Any? {
         // make decision on status
-        var decision: Any?
-        while (true) {
-            decision = this._consensus
-            if (decision !== UNDECIDED) break
-            decision = prepare()
-            check(decision !== UNDECIDED)
-            if (CONSENSUS.compareAndSet(this, UNDECIDED, decision)) break
-        }
+        var decision = this._consensus
+        if (decision === UNDECIDED)
+            decision = decide(prepare())
         complete(affected, decision)
         return decision
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 10f95ee..be23aa3 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -46,6 +46,11 @@
 public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
 
 /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc
+
+/**
  * Doubly-linked concurrent list node with remove support.
  * Based on paper
  * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf)
@@ -85,8 +90,10 @@
         _removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
 
     @PublishedApi
-    internal abstract class CondAddOp(val newNode: Node) : AtomicOp() {
-        lateinit var oldNext: Node
+    internal abstract class CondAddOp(
+        @JvmField val newNode: Node
+    ) : AtomicOp() {
+        @JvmField var oldNext: Node? = null
 
         override fun complete(affected: Any?, failure: Any?) {
             affected as Node // type assertion
@@ -94,7 +101,7 @@
             val update = if (success) newNode else oldNext
             if (NEXT.compareAndSet(affected, this, update)) {
                 // only the thread the makes this update actually finishes add operation
-                if (success) newNode.finishAdd(oldNext)
+                if (success) newNode.finishAdd(oldNext!!)
             }
         }
     }
@@ -121,7 +128,7 @@
         while (true) {
             val prev = this._prev as Node // this sentinel node is never removed
             if (prev.next === this) return prev
-            helpInsert(prev)
+            helpInsert(prev, null)
         }
     }
 
@@ -153,6 +160,8 @@
         }
     }
 
+    public fun describeAddLast(node: Node): AddLastDesc = AddLastDesc(this, node)
+
     /**
      * Adds last item to this list atomically if the [condition] is true.
      */
@@ -224,7 +233,8 @@
             val next = this.next
             if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
             check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
-            if (NEXT.compareAndSet(this, next, (next as Node).removed())) {
+            val removed = (next as Node).removed()
+            if (NEXT.compareAndSet(this, next, removed)) {
                 // was removed successfully (linearized remove) -- fixup the list
                 finishRemove(next)
                 return true
@@ -239,11 +249,11 @@
             override var originalNext: Node? = null
             override fun failure(affected: Node, next: Any): Any? =
                 if (next is Removed) ALREADY_REMOVED else null
-            override fun onPrepare(affected: Node, next: Node): Boolean {
+            override fun onPrepare(affected: Node, next: Node): Any? {
                 originalNext = next
-                return true
+                return null // always success
             }
-            override fun updatedNext(next: Node) = next.removed()
+            override fun updatedNext(affected: Node, next: Node) = next.removed()
             override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
         }
     }
@@ -283,17 +293,62 @@
 
     // ------ multi-word atomic operations helpers ------
 
+    public open class AddLastDesc(val queue: Node, val node: Node) : AbstractAtomicDesc() {
+        init {
+            // require freshly allocated node here
+            check(node._next === node && node._prev === node)
+        }
+
+        final override fun takeAffectedNode(op: OpDescriptor): Node {
+            while (true) {
+                val prev = queue._prev as Node // this sentinel node is never removed
+                val next = prev._next
+                if (next === queue) return prev // all is good -> linked properly
+                if (next === op) return prev // all is good -> our operation descriptor is already there
+                if (next is OpDescriptor) { // some other operation descriptor -> help & retry
+                    next.perform(prev)
+                    continue
+                }
+                // linked improperly -- help insert
+                queue.helpInsert(prev, op)
+            }
+        }
+
+        final override var affectedNode: Node? = null
+        final override val originalNext: Node? get() = queue
+
+        override fun retry(affected: Node, next: Any): Boolean = next !== queue
+
+        override fun onPrepare(affected: Node, next: Node): Any? {
+            affectedNode = affected
+            return null // always success
+        }
+
+        override fun updatedNext(affected: Node, next: Node): Any {
+            // it is invoked only on successfully completion of operation, but this invocation can be stale,
+            // so we must use CAS to set both prev & next pointers
+            PREV.compareAndSet(node, node, affected)
+            NEXT.compareAndSet(node, node, queue)
+            return node
+        }
+
+        override fun finishOnSuccess(affected: Node, next: Node) {
+            node.finishAdd(queue)
+        }
+    }
+
     public open class RemoveFirstDesc<T>(val queue: Node) : AbstractAtomicDesc() {
         @Suppress("UNCHECKED_CAST")
         public val result: T get() = affectedNode!! as T
 
-        final override fun takeAffectedNode(): Node = queue.next as Node
+        final override fun takeAffectedNode(op: OpDescriptor): Node = queue.next as Node
         final override var affectedNode: Node? = null
         final override var originalNext: Node? = null
 
         // check node predicates here, must signal failure if affect is not of type T
         protected override fun failure(affected: Node, next: Any): Any? =
                 if (affected === queue) LIST_EMPTY else null
+
         // validate the resulting node (return false if it should be deleted)
         protected open fun validatePrepared(node: T): Boolean = true // false means remove node & retry
 
@@ -302,38 +357,55 @@
             affected.helpDelete() // must help delete, or loose lock-freedom
             return true
         }
+
         @Suppress("UNCHECKED_CAST")
-        final override fun onPrepare(affected: Node, next: Node): Boolean {
+        final override fun onPrepare(affected: Node, next: Node): Any? {
             check(affected !is LockFreeLinkedListHead)
-            if (!validatePrepared(affected as T)) return false
+            if (!validatePrepared(affected as T)) return REMOVE_PREPARED
             affectedNode = affected
             originalNext = next
-            return true
+            return null // ok
         }
-        final override fun updatedNext(next: Node): Any = next.removed()
+
+        final override fun updatedNext(affected: Node, next: Node): Any = next.removed()
         final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next)
     }
 
     public abstract class AbstractAtomicDesc : AtomicDesc() {
         protected abstract val affectedNode: Node?
         protected abstract val originalNext: Node?
-        protected open fun takeAffectedNode(): Node = affectedNode!!
+        protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
         protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
         protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
-        protected abstract fun onPrepare(affected: Node, next: Node): Boolean // false means: remove node & retry
-        protected abstract fun updatedNext(next: Node): Any
+        protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
+        protected abstract fun updatedNext(affected: Node, next: Node): Any
         protected abstract fun finishOnSuccess(affected: Node, next: Node)
 
         // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
         // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise)
         private class PrepareOp(
-            val next: Node,
-            val op: AtomicOp,
-            val desc: AbstractAtomicDesc
+            @JvmField val next: Node,
+            @JvmField val op: AtomicOp,
+            @JvmField val desc: AbstractAtomicDesc
         ) : OpDescriptor() {
             override fun perform(affected: Any?): Any? {
                 affected as Node // type assertion
-                if (!desc.onPrepare(affected, next)) return REMOVE_PREPARED
+                val decision = desc.onPrepare(affected, next)
+                if (decision != null) {
+                    if (decision === REMOVE_PREPARED) {
+                        // remove element on failure
+                        val removed = next.removed()
+                        if (NEXT.compareAndSet(affected, this, removed)) {
+                            affected.helpDelete()
+                        }
+                    } else {
+                        // some other failure -- mark as decided
+                        op.tryDecide(decision)
+                        // undo preparations
+                        NEXT.compareAndSet(affected, this, next)
+                    }
+                    return decision
+                }
                 check(desc.affectedNode === affected)
                 check(desc.originalNext === next)
                 val update: Any = if (op.isDecided) next else op // restore if decision was already reached
@@ -344,7 +416,7 @@
 
         final override fun prepare(op: AtomicOp): Any? {
             while (true) { // lock free loop on next
-                val affected = takeAffectedNode()
+                val affected = takeAffectedNode(op)
                 // read its original next pointer first
                 val next = affected._next
                 // then see if already reached consensus on overall operation
@@ -362,25 +434,20 @@
                 val prepareOp = PrepareOp(next as Node, op, this)
                 if (NEXT.compareAndSet(affected, next, prepareOp)) {
                     // prepared -- complete preparations
-                    val prepFail = prepareOp.perform(affected) ?: return null // prepared successfully
-                    check(prepFail === REMOVE_PREPARED) // the only way for prepare to fail
-                    if (NEXT.compareAndSet(affected, prepareOp, next.removed())) {
-                        affected.helpDelete()
-                    }
+                    val prepFail = prepareOp.perform(affected)
+                    if (prepFail === REMOVE_PREPARED) continue // retry
+                    return prepFail
                 }
             }
         }
 
         final override fun complete(op: AtomicOp, failure: Any?) {
             val success = failure == null
-            val update = if (success) updatedNext(originalNext!!) else originalNext
-            val affectedNode = affectedNode
-            if (affectedNode == null) {
-                check(!success)
-                return
-            }
+            val affectedNode = affectedNode ?: run { check(!success); return }
+            val originalNext = this.originalNext ?: run { check(!success); return }
+            val update = if (success) updatedNext(affectedNode, originalNext) else originalNext
             if (NEXT.compareAndSet(affectedNode, op, update)) {
-                if (success) finishOnSuccess(affectedNode, originalNext!!)
+                if (success) finishOnSuccess(affectedNode, originalNext)
             }
         }
     }
@@ -394,7 +461,7 @@
             if (PREV.compareAndSet(next, nextPrev, this)) {
                 if (this.next is Removed) {
                     // already removed
-                    next.helpInsert(nextPrev as Node)
+                    next.helpInsert(nextPrev as Node, null)
                 }
                 return
             }
@@ -403,7 +470,7 @@
 
     private fun finishRemove(next: Node) {
         helpDelete()
-        next.helpInsert(_prev.unwrap())
+        next.helpInsert(_prev.unwrap(), null)
     }
 
     private fun markPrev(): Node {
@@ -454,12 +521,17 @@
     }
 
     // fixes prev links from this node
-    private fun helpInsert(_prev: Node) {
+    private fun helpInsert(_prev: Node, op: OpDescriptor?) {
         var prev: Node = _prev
         var last: Node? = null // will be set so that last.next === prev
         while (true) {
             // move the the left until first non-removed node
-            val prevNext = prev.next
+            val prevNext = prev._next
+            if (prevNext === op) return // part of the same op -- don't recurse
+            if (prevNext is OpDescriptor) { // help & retry
+                prevNext.perform(prev)
+                continue
+            }
             if (prevNext is Removed) {
                 if (last !== null) {
                     prev.markPrev()
@@ -490,9 +562,11 @@
         check(prev === this._prev)
         check(next === this._next)
     }
+
+    override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
 }
 
-private class Removed(val ref: Node) {
+private class Removed(@JvmField val ref: Node) {
     override fun toString(): String = "Removed[$ref]"
 }
 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
new file mode 100644
index 0000000..dfb059c
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
+import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
+import kotlinx.coroutines.experimental.internal.AtomicDesc
+import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+
+/**
+ * Scope for [select] invocation.
+ */
+public interface SelectBuilder<in R> : CoroutineScope {
+    /**
+     * Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
+     * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
+     * it cancelled).
+     */
+    public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
+
+    /**
+     * Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
+     * the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
+     * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
+     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
+
+    /**
+     * Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
+     * is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
+     * [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
+
+    /**
+     * Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
+     * is received from the channel or selects the given [block] with `null` if if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
+     * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     */
+    public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
+}
+
+/**
+ * Internal representation of select instance. This instance is called _selected_ when
+ * the clause to execute is already picked.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface SelectInstance<in R> {
+    /**
+     * Returns `true` when this [select] statement had already picked a clause to execute.
+     */
+    public val isSelected: Boolean
+
+    /**
+     * Tries to select this instance.
+     */
+    public fun trySelect(idempotent: Any?): Boolean
+
+    /**
+     * Performs action atomically with [trySelect].
+     */
+    public fun performAtomicTrySelect(desc: AtomicDesc): Any?
+
+    /**
+     * Performs action atomically when [isSelected] is `false`.
+     */
+    public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
+
+    public val completion: Continuation<R>
+
+    public fun resumeSelectWithException(exception: Throwable)
+
+    public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
+
+    public fun unregisterOnCompletion(registration: Job.Registration)
+}
+
+/**
+ * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
+ * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
+ * is either _selected_ or _fails_.
+ *
+ * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
+ * becomes the result of the select. If any clause _fails_, then the select invocation produces the
+ * corresponding exception. No clause is selected in this case.
+ *
+ * There is no `default` clause for select expression. Instead, each selectable suspending function has the
+ * corresponding non-suspending version that can be used with a regular `when` expression to select one
+ * of the alternatives or to perform default (`else`) action if none of them can be immediately selected.
+ *
+ * | **Receiver**     | **Suspending function**                       | **Select clause**                                | **Non-suspending version**
+ * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
+ * | [Deferred]       | [await][Deferred.await]                       | [onAwait][SelectBuilder.onAwait]                 | [isCompleted][Deferred.isCompleted]
+ * | [SendChannel]    | [send][SendChannel.send]                      | [onSend][SelectBuilder.onSend]                   | [offer][SendChannel.offer]
+ * | [ReceiveChannel] | [receive][ReceiveChannel.receive]             | [onReceive][SelectBuilder.onReceive]             | [poll][ReceiveChannel.poll]
+ * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended select is *atomic* -- when this function
+ * throws [CancellationException] it means that no clause was selected.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+public inline suspend fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
+    suspendCoroutineOrReturn { cont ->
+        val scope = SelectBuilderImpl(cont)
+        try {
+            builder(scope)
+        } catch (e: Throwable) {
+            scope.handleBuilderException(e)
+        }
+        scope.initSelectResult()
+    }
+
+/*
+   :todo: It is best to rewrite this class without the use of CancellableContinuationImpl and JobSupport infrastructure
+   This way JobSupport will not have to provide trySelect(idempotent) operation can can save some checks and bytes
+   to carry on that idempotent start token.
+ */
+@PublishedApi
+internal class SelectBuilderImpl<in R>(
+    delegate: Continuation<R>
+) : CancellableContinuationImpl<R>(delegate, active = false), SelectBuilder<R>, SelectInstance<R> {
+    @PublishedApi
+    internal fun handleBuilderException(e: Throwable) {
+        if (trySelect(idempotent = null)) {
+            val token = tryResumeWithException(e)
+            if (token != null)
+                completeResume(token)
+            else
+                handleCoroutineException(context, e)
+        }
+    }
+
+    @PublishedApi
+    internal fun initSelectResult(): Any? {
+        if (!isSelected) initCancellability()
+        return getResult()
+    }
+
+    // coroutines that are started inside this select are directly subordinate to the parent job
+    override fun createContext(): CoroutineContext = delegate.context
+
+    override fun onParentCompletion(cause: Throwable?) {
+        /*
+           Select is cancelled only when no clause was selected yet. If a clause was selected, then
+           it is the concern of the coroutine that was started by that clause to cancel on its suspension
+           points.
+         */
+        if (trySelect(null))
+            cancel(cause)
+    }
+
+    override fun defaultResumeMode(): Int = MODE_DIRECT // all resumes through completion are dispatched directly
+
+    override val completion: Continuation<R> get() {
+        check(isSelected) { "Must be selected first" }
+        return this
+    }
+
+    override fun resumeSelectWithException(exception: Throwable) {
+        check(isSelected) { "Must be selected first" }
+        resumeWithException(exception, mode = 0)
+    }
+
+    override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
+        registerSelectAwait(this@SelectBuilderImpl, block)
+    }
+
+    override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
+        registerSelectSend(this@SelectBuilderImpl, element, block)
+    }
+
+    override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
+        registerSelectReceive(this@SelectBuilderImpl, block)
+    }
+
+    override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
+        registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
+    }
+
+    override fun unregisterOnCompletion(registration: Job.Registration) {
+        invokeOnCompletion(UnregisterOnCompletion(this, registration))
+    }
+}