Basic Channel interfaces and RendezvousChannel implementation
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 64d815b..4daf5f1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -20,15 +20,37 @@
      * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
      */
     val isCancelled: Boolean
+
+    /**
+     * Tries to resume this continuation with a given value and returns `true` if it was successful,
+     * or `false` otherwise (it was already resumed or cancelled).
+     *
+     * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
+     * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+     */
+    fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+
+    /**
+     * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
+     * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
+     */
+    fun initCancellability()
 }
 
 /**
  * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
  * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
+ *
+ * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
+ * cancellable until [CancellableContinuation.initCancellability] is invoked.
  */
-public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
+public inline suspend fun <T> suspendCancellableCoroutine(
+    holdCancellability: Boolean = false,
+    crossinline block: (CancellableContinuation<T>) -> Unit
+): T =
     suspendCoroutineOrReturn { cont ->
         val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
+        if (!holdCancellability) safe.initCancellability()
         block(safe)
         safe.getResult()
     }
@@ -63,7 +85,9 @@
         const val YIELD = 3 // used by cancellable "yield"
     }
 
-    init { initParentJob(parentJob) }
+    override fun initCancellability() {
+        initParentJob(parentJob)
+    }
 
     fun getResult(): Any? {
         val decision = this.decision // volatile read
@@ -80,6 +104,16 @@
     override val isCancelled: Boolean
         get() = getState() is Cancelled
 
+    override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+        while (true) { // lock-free loop on state
+            val state = getState() // atomic read
+            when (state) {
+                is Active -> if (updateState(state, value, onSuccess)) return true
+                else -> return false // cannot resume -- not active anymore
+            }
+        }
+    }
+
     @Suppress("UNCHECKED_CAST")
     override fun afterCompletion(state: Any?) {
         val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 8d767b1..d34edc8 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,14 +213,16 @@
     /**
      * Tries to update current [state][getState] of this job.
      */
-    fun updateState(expect: Any, update: Any?): Boolean {
+    fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
         require(expect is Active && update !is Active) // only active -> inactive transition is allowed
         if (!STATE.compareAndSet(this, expect, update)) return false
         // #1. Update linked state before invoking completion handlers
         onStateUpdate(update)
         // #2. Unregister from parent job
         registration?.unregister() // volatile read registration _after_ state was updated
-        // #3. Invoke completion handlers
+        // #3. Additional (optional) callback
+        onSuccess?.invoke(update)
+        // #4. Invoke completion handlers
         val reason = (update as? CompletedExceptionally)?.cancelReason
         var completionException: Throwable? = null
         when (expect) {
@@ -242,7 +244,7 @@
             // otherwise -- do nothing (Empty)
             else -> check(expect == Empty)
         }
-        // #4. Do other (overridable) processing after completion handlers
+        // #5. Do other (overridable) processing after completion handlers
         completionException?.let { handleCompletionException(it) }
         afterCompletion(update)
         return true
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
new file mode 100644
index 0000000..ef96c21
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -0,0 +1,167 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.yield
+import java.util.*
+
+/**
+ * Sender's interface to [Channel].
+ */
+public interface SendChannel<in E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close] and thus
+     * the [send] attempt will throw [ClosedSendChannelException].
+     */
+    public val isClosedForSend: Boolean
+
+    /**
+     * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
+     * This function returns `false` for [isClosedForSend] channel.
+     */
+    public val isFull: Boolean
+
+    /**
+     * Adds [element] into to this queue, suspending the caller while this queue [isFull],
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended send is *atomic* -- when this function
+     * throws [CancellationException] it means that the [element] was not sent to this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun send(element: E)
+
+    /**
+     * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
+     * and returns `true`. Otherwise, it returns `false` immediately
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     */
+    public fun offer(element: E): Boolean
+
+    /**
+     * Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
+     * Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
+     * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+     * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
+     * are received.
+     */
+    public fun close()
+}
+
+/**
+ * Receiver's interface to [Channel].
+ */
+public interface ReceiveChannel<out E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
+     * side and all previously sent items were already received, so that the [receive] attempt will
+     * throw [ClosedReceiveChannelException].
+     */
+    public val isClosedForReceive: Boolean
+
+    /**
+     * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
+     * This function returns `false` for [isClosedForReceive] channel.
+     */
+    public val isEmpty: Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun receive(): E
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or returns `null` if the channel [isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun receiveOrNull(): E?
+
+    /**
+     * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
+     * or [isClosedForReceive].
+     */
+    public fun pool(): E?
+
+    /**
+     * Returns new iterator to receive elements from this channels using `for` loop.
+     */
+    public operator fun iterator(): ChannelIterator<E>
+}
+
+/**
+ * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
+ * from concurrent coroutines.
+ */
+public interface ChannelIterator<out E> {
+    /**
+     * Returns `true` if the channel has more elements suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     * This function retrieves and removes the element from this channel for the subsequent invocation
+     * of [next].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun hasNext(): Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun next(): E
+}
+
+/**
+ * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
+ * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
+ * but it has suspending operations instead of blocking ones and it can be closed.
+ */
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+
+/**
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
+ */
+public class ClosedSendChannelException : IllegalStateException()
+
+/**
+ * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * channel.
+ */
+public class ClosedReceiveChannelException : NoSuchElementException()
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
new file mode 100644
index 0000000..57095cc
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -0,0 +1,284 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
+ * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
+ * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ */
+public class RendezvousChannel<E> : Channel<E> {
+    private val queue = LockFreeLinkedListHead()
+
+    // ------ SendChannel ------
+
+    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+
+    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
+
+    suspend override fun send(element: E) {
+        // fast path if receive is already waiting for rendezvous
+        while (true) { // loop while there are receive waiters
+            val receive = takeFirstReceiveOrPeekClosed() ?: break
+            if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val send = SendElement(cont, element) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(send) { it !is Receive<*>  && it !is Closed<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(send)
+                    return@sc
+                }
+                // hm... there are already receivers (maybe), so try taking first
+                takeFirstReceiveOrPeekClosed()?.also { receive ->
+                    if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
+                }
+            }
+        }
+    }
+
+    override fun offer(element: E): Boolean {
+        takeFirstReceiveOrPeekClosed()?.apply {
+            tryResumeReceiveFromSend(element)
+            return true
+        }
+        return false
+    }
+
+    override fun close() {
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed()
+            if (receive == null) {
+                // queue empty or has only senders -- try add last "Closed" item to the queue
+                if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
+            }
+            if (receive is Closed<*>) return // already marked as closed -- nothing to do
+            receive as Receive<E> // type assertion
+            receive.resumeReceiveClosed()
+        }
+    }
+
+    private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+    // ------ ReceiveChannel ------
+
+    override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
+
+    override val isEmpty: Boolean get() = queue.next() !is Send<*>
+
+    suspend override fun receive(): E {
+        // fast path if send is already waiting for rendezvous or closed
+        while (true) { // loop while there are send waiters
+            val send = takeFirstSendOrPeekClosed() ?: break
+            if (send.tryResumeSend()) return send.element // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... there are already senders (maybe), so try taking first
+                takeFirstSendOrPeekClosed()?.also { send ->
+                    if (send.tryResumeSend()) {
+                        send.resumeWithElement(cont)
+                        return@sc
+                    }
+                }
+            }
+        }
+    }
+
+    suspend override fun receiveOrNull(): E? {
+        // fast path if send is already waiting for rendezvous or closed
+        while (true) { // loop while there are send waiters
+            val send = takeFirstSendOrPeekClosed() ?: break
+            if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... there are already senders (maybe), so try taking first
+                takeFirstSendOrPeekClosed()?.also { send ->
+                    if (send.tryResumeSend()) {
+                        send.resumeWithElementOrNull(cont)
+                        return@sc
+                    }
+                }
+            }
+        }
+    }
+
+    override fun pool(): E? {
+        while (true) {
+            val waiter = takeFirstSendOrPeekClosed() ?: return null
+            if (waiter.tryResumeSend()) return waiter.element
+        }
+    }
+
+    override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+    private fun takeFirstSendOrPeekClosed(): Send<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
+
+    private companion object {
+        val IS_UNKNOWN = 0
+        val IS_HAS_VALUE = 1
+        val IS_CLOSED = 2
+    }
+
+    private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
+        var state: Int = IS_UNKNOWN
+        var value: E? = null
+
+        suspend override fun hasNext(): Boolean {
+            when (state) {
+                IS_HAS_VALUE -> return true
+                IS_CLOSED -> return false
+            }
+            // fast path if send is already waiting for rendezvous
+            while (true) { // loop while there are send waiters
+                val send = channel.takeFirstSendOrPeekClosed() ?: break
+                if (send.tryResumeSend()) return updateStateWithSend(send)
+            }
+            // slow-path does suspend
+            return suspendCancellableCoroutine(true) sc@ { cont ->
+                while (true) {
+                    val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
+                    if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                        cont.initCancellability() // make it properly cancellable
+                        cont.removeOnCompletion(receive)
+                        return@sc
+                    }
+                    // hm... there are already senders (maybe), so try taking first
+                    channel.takeFirstSendOrPeekClosed()?.also { send ->
+                        if (send.tryResumeSend()) {
+                            cont.resume(updateStateWithSend(send))
+                            return@sc
+                        }
+                    }
+                }
+            }
+        }
+
+        private fun updateStateWithSend(send: Send<E>): Boolean {
+            if (send is Closed<*>) {
+                state = IS_CLOSED
+                return false
+            } else {
+                state = IS_HAS_VALUE
+                value = send.element
+                return true
+            }
+        }
+
+        suspend override fun next(): E {
+            when (state) {
+                IS_HAS_VALUE -> {
+                    val value = this.value as E
+                    this.state = IS_UNKNOWN
+                    this.value = null
+                    return value
+                }
+                IS_CLOSED -> throw ClosedReceiveChannelException()
+            }
+            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
+            return channel.receive()
+        }
+    }
+
+    private abstract class Send<out E> : LockFreeLinkedListNode() {
+        abstract val element: E
+        abstract val elementOrNull: E?
+        abstract fun tryResumeSend(): Boolean
+        abstract fun resumeWithElement(cont: CancellableContinuation<E>)
+        abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
+    }
+
+    private class SendElement<out E>(
+            val cont: CancellableContinuation<Unit>,
+            override val element: E
+    ) : Send<E>() {
+        override val elementOrNull: E? get() = element
+        override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
+        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
+        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
+    }
+
+    private interface ReceiveOrClosed<in E> {
+        fun tryResumeReceiveFromSend(value: E): Boolean
+        fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
+    }
+
+    private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
+        override val element: E get() = throw ClosedReceiveChannelException()
+        override val elementOrNull: E? get() = null
+        override fun tryResumeSend(): Boolean = true
+        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
+        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
+        override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
+        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+            cont.resumeWithException(ClosedSendChannelException())
+            return true
+        }
+    }
+
+    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+            if (tryResumeReceiveFromSend(value)) {
+                cont.resume(Unit)
+                return true
+            }
+            return false
+        }
+        abstract fun resumeReceiveClosed()
+    }
+
+    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+    }
+
+    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+        override fun resumeReceiveClosed() = cont.resume(null)
+    }
+
+    private class ReceiveHasNext<E>(
+            val iterator: Iterator<E>,
+            val cont: CancellableContinuation<Boolean>
+    ) : Receive<E>(), (Any?) -> Unit {
+        override fun tryResumeReceiveFromSend(value: E): Boolean {
+            iterator.value = value // tentative value (may fail to resume with it)
+            return cont.tryResume(true, this)
+        }
+
+        override fun resumeReceiveClosed() {
+            cont.tryResume(false, this)
+        }
+
+        // callback for tryResume onSuccess
+        override fun invoke(hasNext: Any?) {
+            hasNext as Boolean // try assertion -- that is the value we pass
+            iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
+            if (!hasNext) iterator.value = null // cleanup tentative value
+        }
+    }
+}
+