Basic Channel interfaces and RendezvousChannel implementation
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 64d815b..4daf5f1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -20,15 +20,37 @@
      * Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
      */
     val isCancelled: Boolean
+
+    /**
+     * Tries to resume this continuation with a given value and returns `true` if it was successful,
+     * or `false` otherwise (it was already resumed or cancelled).
+     *
+     * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
+     * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+     */
+    fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+
+    /**
+     * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
+     * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
+     */
+    fun initCancellability()
 }
 
 /**
  * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
  * the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
+ *
+ * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
+ * cancellable until [CancellableContinuation.initCancellability] is invoked.
  */
-public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
+public inline suspend fun <T> suspendCancellableCoroutine(
+    holdCancellability: Boolean = false,
+    crossinline block: (CancellableContinuation<T>) -> Unit
+): T =
     suspendCoroutineOrReturn { cont ->
         val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
+        if (!holdCancellability) safe.initCancellability()
         block(safe)
         safe.getResult()
     }
@@ -63,7 +85,9 @@
         const val YIELD = 3 // used by cancellable "yield"
     }
 
-    init { initParentJob(parentJob) }
+    override fun initCancellability() {
+        initParentJob(parentJob)
+    }
 
     fun getResult(): Any? {
         val decision = this.decision // volatile read
@@ -80,6 +104,16 @@
     override val isCancelled: Boolean
         get() = getState() is Cancelled
 
+    override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+        while (true) { // lock-free loop on state
+            val state = getState() // atomic read
+            when (state) {
+                is Active -> if (updateState(state, value, onSuccess)) return true
+                else -> return false // cannot resume -- not active anymore
+            }
+        }
+    }
+
     @Suppress("UNCHECKED_CAST")
     override fun afterCompletion(state: Any?) {
         val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 8d767b1..d34edc8 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,14 +213,16 @@
     /**
      * Tries to update current [state][getState] of this job.
      */
-    fun updateState(expect: Any, update: Any?): Boolean {
+    fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
         require(expect is Active && update !is Active) // only active -> inactive transition is allowed
         if (!STATE.compareAndSet(this, expect, update)) return false
         // #1. Update linked state before invoking completion handlers
         onStateUpdate(update)
         // #2. Unregister from parent job
         registration?.unregister() // volatile read registration _after_ state was updated
-        // #3. Invoke completion handlers
+        // #3. Additional (optional) callback
+        onSuccess?.invoke(update)
+        // #4. Invoke completion handlers
         val reason = (update as? CompletedExceptionally)?.cancelReason
         var completionException: Throwable? = null
         when (expect) {
@@ -242,7 +244,7 @@
             // otherwise -- do nothing (Empty)
             else -> check(expect == Empty)
         }
-        // #4. Do other (overridable) processing after completion handlers
+        // #5. Do other (overridable) processing after completion handlers
         completionException?.let { handleCompletionException(it) }
         afterCompletion(update)
         return true
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
new file mode 100644
index 0000000..ef96c21
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -0,0 +1,167 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.yield
+import java.util.*
+
+/**
+ * Sender's interface to [Channel].
+ */
+public interface SendChannel<in E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close] and thus
+     * the [send] attempt will throw [ClosedSendChannelException].
+     */
+    public val isClosedForSend: Boolean
+
+    /**
+     * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
+     * This function returns `false` for [isClosedForSend] channel.
+     */
+    public val isFull: Boolean
+
+    /**
+     * Adds [element] into to this queue, suspending the caller while this queue [isFull],
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended send is *atomic* -- when this function
+     * throws [CancellationException] it means that the [element] was not sent to this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun send(element: E)
+
+    /**
+     * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
+     * and returns `true`. Otherwise, it returns `false` immediately
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     */
+    public fun offer(element: E): Boolean
+
+    /**
+     * Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
+     * Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
+     * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+     * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
+     * are received.
+     */
+    public fun close()
+}
+
+/**
+ * Receiver's interface to [Channel].
+ */
+public interface ReceiveChannel<out E> {
+    /**
+     * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
+     * side and all previously sent items were already received, so that the [receive] attempt will
+     * throw [ClosedReceiveChannelException].
+     */
+    public val isClosedForReceive: Boolean
+
+    /**
+     * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
+     * This function returns `false` for [isClosedForReceive] channel.
+     */
+    public val isEmpty: Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun receive(): E
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+     * or returns `null` if the channel [isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend fun receiveOrNull(): E?
+
+    /**
+     * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
+     * or [isClosedForReceive].
+     */
+    public fun pool(): E?
+
+    /**
+     * Returns new iterator to receive elements from this channels using `for` loop.
+     */
+    public operator fun iterator(): ChannelIterator<E>
+}
+
+/**
+ * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
+ * from concurrent coroutines.
+ */
+public interface ChannelIterator<out E> {
+    /**
+     * Returns `true` if the channel has more elements suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     * This function retrieves and removes the element from this channel for the subsequent invocation
+     * of [next].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun hasNext(): Boolean
+
+    /**
+     * Retrieves and removes the element from this channel suspending the caller while this channel
+     * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     * Cancellation of suspended receive is *atomic* -- when this function
+     * throws [CancellationException] it means that the element was not retrieved from this channel.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     */
+    public suspend operator fun next(): E
+}
+
+/**
+ * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
+ * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
+ * but it has suspending operations instead of blocking ones and it can be closed.
+ */
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+
+/**
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
+ */
+public class ClosedSendChannelException : IllegalStateException()
+
+/**
+ * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * channel.
+ */
+public class ClosedReceiveChannelException : NoSuchElementException()
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
new file mode 100644
index 0000000..57095cc
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -0,0 +1,284 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
+ * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
+ * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ */
+public class RendezvousChannel<E> : Channel<E> {
+    private val queue = LockFreeLinkedListHead()
+
+    // ------ SendChannel ------
+
+    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+
+    override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
+
+    suspend override fun send(element: E) {
+        // fast path if receive is already waiting for rendezvous
+        while (true) { // loop while there are receive waiters
+            val receive = takeFirstReceiveOrPeekClosed() ?: break
+            if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val send = SendElement(cont, element) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(send) { it !is Receive<*>  && it !is Closed<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(send)
+                    return@sc
+                }
+                // hm... there are already receivers (maybe), so try taking first
+                takeFirstReceiveOrPeekClosed()?.also { receive ->
+                    if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
+                }
+            }
+        }
+    }
+
+    override fun offer(element: E): Boolean {
+        takeFirstReceiveOrPeekClosed()?.apply {
+            tryResumeReceiveFromSend(element)
+            return true
+        }
+        return false
+    }
+
+    override fun close() {
+        while (true) {
+            val receive = takeFirstReceiveOrPeekClosed()
+            if (receive == null) {
+                // queue empty or has only senders -- try add last "Closed" item to the queue
+                if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
+            }
+            if (receive is Closed<*>) return // already marked as closed -- nothing to do
+            receive as Receive<E> // type assertion
+            receive.resumeReceiveClosed()
+        }
+    }
+
+    private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+    // ------ ReceiveChannel ------
+
+    override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
+
+    override val isEmpty: Boolean get() = queue.next() !is Send<*>
+
+    suspend override fun receive(): E {
+        // fast path if send is already waiting for rendezvous or closed
+        while (true) { // loop while there are send waiters
+            val send = takeFirstSendOrPeekClosed() ?: break
+            if (send.tryResumeSend()) return send.element // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... there are already senders (maybe), so try taking first
+                takeFirstSendOrPeekClosed()?.also { send ->
+                    if (send.tryResumeSend()) {
+                        send.resumeWithElement(cont)
+                        return@sc
+                    }
+                }
+            }
+        }
+    }
+
+    suspend override fun receiveOrNull(): E? {
+        // fast path if send is already waiting for rendezvous or closed
+        while (true) { // loop while there are send waiters
+            val send = takeFirstSendOrPeekClosed() ?: break
+            if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
+        }
+        // slow-path does suspend
+        return suspendCancellableCoroutine(true) sc@ { cont ->
+            while (true) {
+                val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
+                if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                    cont.initCancellability() // make it properly cancellable
+                    cont.removeOnCompletion(receive)
+                    return@sc
+                }
+                // hm... there are already senders (maybe), so try taking first
+                takeFirstSendOrPeekClosed()?.also { send ->
+                    if (send.tryResumeSend()) {
+                        send.resumeWithElementOrNull(cont)
+                        return@sc
+                    }
+                }
+            }
+        }
+    }
+
+    override fun pool(): E? {
+        while (true) {
+            val waiter = takeFirstSendOrPeekClosed() ?: return null
+            if (waiter.tryResumeSend()) return waiter.element
+        }
+    }
+
+    override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+    private fun takeFirstSendOrPeekClosed(): Send<E>? =
+        queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
+
+    private companion object {
+        val IS_UNKNOWN = 0
+        val IS_HAS_VALUE = 1
+        val IS_CLOSED = 2
+    }
+
+    private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
+        var state: Int = IS_UNKNOWN
+        var value: E? = null
+
+        suspend override fun hasNext(): Boolean {
+            when (state) {
+                IS_HAS_VALUE -> return true
+                IS_CLOSED -> return false
+            }
+            // fast path if send is already waiting for rendezvous
+            while (true) { // loop while there are send waiters
+                val send = channel.takeFirstSendOrPeekClosed() ?: break
+                if (send.tryResumeSend()) return updateStateWithSend(send)
+            }
+            // slow-path does suspend
+            return suspendCancellableCoroutine(true) sc@ { cont ->
+                while (true) {
+                    val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
+                    if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
+                        cont.initCancellability() // make it properly cancellable
+                        cont.removeOnCompletion(receive)
+                        return@sc
+                    }
+                    // hm... there are already senders (maybe), so try taking first
+                    channel.takeFirstSendOrPeekClosed()?.also { send ->
+                        if (send.tryResumeSend()) {
+                            cont.resume(updateStateWithSend(send))
+                            return@sc
+                        }
+                    }
+                }
+            }
+        }
+
+        private fun updateStateWithSend(send: Send<E>): Boolean {
+            if (send is Closed<*>) {
+                state = IS_CLOSED
+                return false
+            } else {
+                state = IS_HAS_VALUE
+                value = send.element
+                return true
+            }
+        }
+
+        suspend override fun next(): E {
+            when (state) {
+                IS_HAS_VALUE -> {
+                    val value = this.value as E
+                    this.state = IS_UNKNOWN
+                    this.value = null
+                    return value
+                }
+                IS_CLOSED -> throw ClosedReceiveChannelException()
+            }
+            // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
+            return channel.receive()
+        }
+    }
+
+    private abstract class Send<out E> : LockFreeLinkedListNode() {
+        abstract val element: E
+        abstract val elementOrNull: E?
+        abstract fun tryResumeSend(): Boolean
+        abstract fun resumeWithElement(cont: CancellableContinuation<E>)
+        abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
+    }
+
+    private class SendElement<out E>(
+            val cont: CancellableContinuation<Unit>,
+            override val element: E
+    ) : Send<E>() {
+        override val elementOrNull: E? get() = element
+        override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
+        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
+        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
+    }
+
+    private interface ReceiveOrClosed<in E> {
+        fun tryResumeReceiveFromSend(value: E): Boolean
+        fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
+    }
+
+    private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
+        override val element: E get() = throw ClosedReceiveChannelException()
+        override val elementOrNull: E? get() = null
+        override fun tryResumeSend(): Boolean = true
+        override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
+        override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
+        override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
+        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+            cont.resumeWithException(ClosedSendChannelException())
+            return true
+        }
+    }
+
+    private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+        override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+            if (tryResumeReceiveFromSend(value)) {
+                cont.resume(Unit)
+                return true
+            }
+            return false
+        }
+        abstract fun resumeReceiveClosed()
+    }
+
+    private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+    }
+
+    private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+        override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+        override fun resumeReceiveClosed() = cont.resume(null)
+    }
+
+    private class ReceiveHasNext<E>(
+            val iterator: Iterator<E>,
+            val cont: CancellableContinuation<Boolean>
+    ) : Receive<E>(), (Any?) -> Unit {
+        override fun tryResumeReceiveFromSend(value: E): Boolean {
+            iterator.value = value // tentative value (may fail to resume with it)
+            return cont.tryResume(true, this)
+        }
+
+        override fun resumeReceiveClosed() {
+            cont.tryResume(false, this)
+        }
+
+        // callback for tryResume onSuccess
+        override fun invoke(hasNext: Any?) {
+            hasNext as Boolean // try assertion -- that is the value we pass
+            iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
+            if (!hasNext) iterator.value = null // cleanup tentative value
+        }
+    }
+}
+
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
new file mode 100644
index 0000000..5998e08
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
@@ -0,0 +1,103 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import java.util.*
+import kotlin.test.assertEquals
+
+class RendezvousChannelAtomicityStressTest {
+    val TEST_DURATION = 3000L
+
+    val channel = RendezvousChannel<Int>()
+    val senderDone = RendezvousChannel<Boolean>()
+    val receiverDone = RendezvousChannel<Boolean>()
+
+    var lastSent = 0
+    var lastReceived = 0
+
+    var stoppedSender = 0
+    var stoppedReceiver = 0
+
+    var missedCnt = 0
+    var dupCnt = 0
+
+    lateinit var sender: Job
+    lateinit var receiver: Job
+
+    @Test
+    fun testStress() = runBlocking {
+        val deadline = System.currentTimeMillis() + TEST_DURATION
+        launchSender()
+        launchReceiver()
+        val rnd = Random()
+        while (System.currentTimeMillis() < deadline) {
+            when (rnd.nextInt(4)) {
+                0 -> { // cancel & restart sender
+                    stopSender()
+                    launchSender()
+                }
+                1 -> { // cancel & restrat receiver
+                    stopReceier()
+                    launchReceiver()
+                }
+                2 -> yield() // just yield (burn a little time)
+                3 -> delay(1L) // delay for more a bit
+            }
+        }
+        stopSender()
+        stopReceier()
+        println("            Sent $lastSent ints to channel")
+        println("        Received $lastReceived ints from channel")
+        println("  Stopped sender $stoppedSender times")
+        println("Stopped receiver $stoppedReceiver times")
+        println("          Missed $missedCnt ints")
+        println("      Duplicated $dupCnt ints")
+        assertEquals(0, missedCnt)
+        assertEquals(0, dupCnt)
+        assertEquals(lastSent, lastReceived)
+    }
+
+    fun launchSender() {
+        sender = launch(CommonPool) {
+            try {
+                while (true) {
+                    val trySend = lastSent + 1
+                    channel.send(trySend)
+                    lastSent = trySend // update on success
+                }
+            } finally {
+                run(NonCancellable) { senderDone.send(true) }
+            }
+        }
+    }
+
+    suspend fun stopSender() {
+        stoppedSender++
+        sender.cancel()
+        senderDone.receive()
+    }
+
+    fun launchReceiver() {
+        receiver = launch(CommonPool) {
+            try {
+                while (true) {
+                    val received = channel.receive()
+                    val expected = lastReceived + 1
+                    if (received > expected)
+                        missedCnt++
+                    if (received < expected)
+                        dupCnt++
+                    lastReceived = received
+                }
+            } finally {
+                run(NonCancellable) { receiverDone.send(true) }
+            }
+        }
+    }
+
+    suspend fun stopReceier() {
+        stoppedReceiver++
+        receiver.cancel()
+        receiverDone.receive()
+    }
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
new file mode 100644
index 0000000..a8269c3
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
@@ -0,0 +1,84 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.join
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+class RendezvousChannelMultipleReceiversStressTest {
+    val nEvents = 1_000_000
+    val nReceivers = 6
+
+    val channel = RendezvousChannel<Int>()
+    val completedSuccessfully = AtomicInteger()
+    val dupes = AtomicInteger()
+    val received = ConcurrentHashMap<Int,Int>()
+    val receivedBy = IntArray(nReceivers)
+
+    @Test
+    fun testStress() = runBlocking {
+        val receivers = List(nReceivers) { receiverIndex ->
+            // different event receivers use different code
+            launch(CommonPool) {
+                when (receiverIndex % 3) {
+                    0 -> doReceive(receiverIndex)
+                    1 -> doReceiveOrNull(receiverIndex)
+                    2 -> doIterator(receiverIndex)
+                }
+                completedSuccessfully.incrementAndGet()
+            }
+        }
+        repeat(nEvents) {
+            channel.send(it)
+        }
+        channel.close()
+        receivers.forEach { it.join() }
+        println("Completed successfully ${completedSuccessfully.get()} coroutines")
+        println("              Received ${received.size} events")
+        println("        Received dupes ${dupes.get()}")
+        repeat(nReceivers) { receiveIndex ->
+            println("        Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+        }
+        assertEquals(nReceivers, completedSuccessfully.get())
+        assertEquals(0, dupes.get())
+        assertEquals(nEvents, received.size)
+        repeat(nReceivers) { receiveIndex ->
+            assertTrue(receivedBy[receiveIndex] > nEvents / nReceivers / 2, "Should be balanced")
+        }
+    }
+
+    private fun doReceived(event: Int) {
+        if (received.put(event, event) != null) {
+            println("Duplicate event $event")
+            dupes.incrementAndGet()
+        }
+    }
+
+    private suspend fun doReceive(receiverIndex: Int) {
+        while (true) {
+            try { doReceived(channel.receive()) }
+            catch (ex: ClosedReceiveChannelException) { break }
+            receivedBy[receiverIndex]++
+        }
+
+    }
+
+    private suspend fun doReceiveOrNull(receiverIndex: Int) {
+        while (true) {
+            doReceived(channel.receiveOrNull() ?: break)
+            receivedBy[receiverIndex]++
+        }
+    }
+
+    private suspend fun doIterator(receiverIndex: Int) {
+        for (event in channel) {
+            doReceived(event)
+            receivedBy[receiverIndex]++
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
new file mode 100644
index 0000000..669607e
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -0,0 +1,126 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class RendezvousChannelTest : TestBase() {
+    @Test
+    fun testSimple() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull)
+        expect(1)
+        val sender = launch(context) {
+            expect(4)
+            q.send(1) // suspend -- the first to come to rendezvous
+            expect(7)
+            q.send(2) // does not suspend -- receiver is there
+            expect(8)
+        }
+        expect(2)
+        val receiver = launch(context) {
+            expect(5)
+            check(q.receive() == 1) // does not suspend -- sender was there
+            expect(6)
+            check(q.receive() == 2) // suspends
+            expect(9)
+        }
+        expect(3)
+        sender.join()
+        receiver.join()
+        check(q.isEmpty && q.isFull)
+        finish(10)
+    }
+
+    @Test
+    fun testStress() = runBlocking {
+        val n = 100_000
+        val q = RendezvousChannel<Int>()
+        val sender = launch(context) {
+            for (i in 1..n) q.send(i)
+            expect(2)
+        }
+        val receiver = launch(context) {
+            for (i in 1..n) check(q.receive() == i)
+            expect(3)
+        }
+        expect(1)
+        sender.join()
+        receiver.join()
+        finish(4)
+    }
+
+    @Test
+    fun testClosedReceiveOrNull() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+        expect(1)
+        launch(context) {
+            expect(3)
+            assertEquals(42, q.receiveOrNull())
+            expect(4)
+            assertEquals(null, q.receiveOrNull())
+            expect(6)
+        }
+        expect(2)
+        q.send(42)
+        expect(5)
+        q.close()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        yield()
+        check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+        finish(7)
+    }
+
+    @Test
+    fun testClosedExceptions() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        expect(1)
+        launch(context) {
+            expect(4)
+            try { q.receive() }
+            catch (e: ClosedReceiveChannelException) {
+                expect(5)
+            }
+        }
+        expect(2)
+        q.close()
+        expect(3)
+        yield()
+        expect(6)
+        try { q.send(42) }
+        catch (e: ClosedSendChannelException) {
+            finish(7)
+        }
+    }
+
+    @Test
+    fun testOfferAndPool() = runBlocking {
+        val q = RendezvousChannel<Int>()
+        assertFalse(q.offer(1))
+        expect(1)
+        launch(context) {
+            expect(3)
+            assertEquals(null, q.pool())
+            expect(4)
+            assertEquals(2, q.receive())
+            expect(7)
+            assertEquals(null, q.pool())
+            yield()
+            expect(9)
+            assertEquals(3, q.pool())
+            expect(10)
+        }
+        expect(2)
+        yield()
+        expect(5)
+        assertTrue(q.offer(2))
+        expect(6)
+        yield()
+        expect(8)
+        q.send(3)
+        finish(11)
+    }
+}
\ No newline at end of file