Support failed channels (closed for cause), buildChannel coroutine builder
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 9a87ca5..db8e986 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -73,7 +73,7 @@
 ) : AbstractCoroutine<Unit>(parentContext) {
     override fun afterCompletion(state: Any?) {
         // note the use of the parent's job context below!
-        if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
+        if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelCause)
     }
 }
 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 1b1adad..18af647 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -29,7 +29,14 @@
     fun tryResume(value: T): Any?
 
     /**
-     * Completes the execution of [tryResume] on its non-null result.
+     * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
+     * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+     * [completeResume] must be invoked with it.
+     */
+    fun tryResumeWithException(exception: Throwable): Any?
+
+    /**
+     * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
      */
     fun completeResume(token: Any)
 
@@ -117,6 +124,16 @@
         }
     }
 
+    override fun tryResumeWithException(exception: Throwable): Any? {
+        while (true) { // lock-free loop on state
+            val state = getState() // atomic read
+            when (state) {
+                is Active -> if (tryUpdateState(state, Failed(exception))) return state
+                else -> return null // cannot resume -- not active anymore
+            }
+        }
+    }
+
     override fun completeResume(token: Any) {
         completeUpdateState(token, getState())
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 9147ede..5040d34 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -57,7 +57,7 @@
     public fun onCompletion(handler: CompletionHandler): Registration
 
     /**
-     * Cancel this activity with an optional cancellation [reason]. The result is `true` if this job was
+     * Cancel this activity with an optional cancellation [cause]. The result is `true` if this job was
      * cancelled as a result of this invocation and `false` otherwise
      * (if it was already cancelled or it is [NonCancellable]).
      * Repeated invocation of this function has no effect and always produces `false`.
@@ -66,7 +66,7 @@
      * at the corresponding original cancellation site and passed into this method to aid in debugging by providing
      * both the context of cancellation and text description of the reason.
      */
-    public fun cancel(reason: Throwable? = null): Boolean
+    public fun cancel(cause: Throwable? = null): Boolean
 
     @Suppress("DeprecatedCallableAddReplaceWith")
     @Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
@@ -231,7 +231,7 @@
 
     fun completeUpdateState(expect: Any, update: Any?) {
         // #3. Invoke completion handlers
-        val reason = (update as? CompletedExceptionally)?.cancelReason
+        val reason = (update as? CompletedExceptionally)?.cancelCause
         var completionException: Throwable? = null
         when (expect) {
             // SINGLE/SINGLE+ state -- one completion handler (common case)
@@ -295,7 +295,7 @@
                 }
                 // is not active anymore
                 else -> {
-                    handler((state as? Cancelled)?.cancelReason)
+                    handler((state as? Cancelled)?.cancelCause)
                     return EmptyRegistration
                 }
             }
@@ -327,10 +327,10 @@
         }
     }
 
-    final override fun cancel(reason: Throwable?): Boolean {
+    final override fun cancel(cause: Throwable?): Boolean {
         while (true) { // lock-free loop on state
             val state = this.state as? Active ?: return false // quit if not active anymore
-            if (updateState(state, Cancelled(reason))) return true
+            if (updateState(state, Cancelled(cause))) return true
         }
     }
 
@@ -371,31 +371,31 @@
      * Abstract class for a [state][getState] of a job that had completed exceptionally, including cancellation.
      */
     internal abstract class CompletedExceptionally {
-        abstract val cancelReason: Throwable // original reason or fresh CancellationException
+        abstract val cancelCause: Throwable // original reason or fresh CancellationException
         abstract val exception: Throwable // the exception to be thrown in continuation
 
-        // convert cancelReason to CancellationException on first need
+        // convert cancelCause to CancellationException on first need
         @Volatile
         private var _cancellationException: CancellationException? = null
 
         val cancellationException: CancellationException get() =
             _cancellationException ?: // atomic read volatile var or else build new
-                (cancelReason as? CancellationException ?:
-                        CancellationException(cancelReason.message).apply { initCause(cancelReason) })
-                        .also { _cancellationException = it }
-
+                (cancelCause as? CancellationException ?:
+                        CancellationException(cancelCause.message)
+                                .apply { initCause(cancelCause) })
+                                .also { _cancellationException = it }
     }
 
     /**
      * Represents a [state][getState] of a cancelled job.
      */
-    internal class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
+    internal class Cancelled(specifiedCause: Throwable?) : CompletedExceptionally() {
         @Volatile
-        private var _cancelReason = specifiedReason // materialize CancellationException on first need
+        private var _cancelCause = specifiedCause // materialize CancellationException on first need
 
-        override val cancelReason: Throwable get() =
-            _cancelReason ?: // atomic read volatile var or else create new
-                CancellationException("Job was cancelled without specified reason").also { _cancelReason = it }
+        override val cancelCause: Throwable get() =
+            _cancelCause ?: // atomic read volatile var or else create new
+                CancellationException("Job was cancelled").also { _cancelCause = it }
 
         override val exception: Throwable get() = cancellationException
     }
@@ -404,7 +404,7 @@
      * Represents a [state][getState] of a failed job.
      */
     internal class Failed(override val exception: Throwable) : CompletedExceptionally() {
-        override val cancelReason: Throwable get() = exception
+        override val cancelCause: Throwable get() = exception
     }
 }
 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index 5d253e6..ba54f35 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -16,5 +16,5 @@
     override val isActive: Boolean  get() = true
     override fun getInactiveCancellationException(): CancellationException = throw IllegalStateException("This job is always active")
     override fun onCompletion(handler: CompletionHandler): Job.Registration = EmptyRegistration
-    override fun cancel(reason: Throwable?): Boolean = false
+    override fun cancel(cause: Throwable?): Boolean = false
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 7cd231f..7978fd1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -20,24 +20,25 @@
 
     /**
      * Tries to add element to buffer or to queued receiver.
-     * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+     * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
      */
-    protected abstract fun offerInternal(element: E): Int
+    protected abstract fun offerInternal(element: E): Any
 
     /**
      * Tries to remove element from buffer or from queued sender.
-     * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+     * Return type is `E | POLL_EMPTY | Closed`
      */
     protected abstract fun pollInternal(): Any?
 
 
-    // ------ state function for concrete implementations ------
+    // ------ state functions for concrete implementations ------
 
-    protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+    protected val closedForReceive: Any? get() = queue.next() as? Closed<*>
+    protected val closedForSend: Any? get() = queue.prev() as? Closed<*>
 
     // ------ SendChannel ------
 
-    override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+    override val isClosedForSend: Boolean get() = closedForSend != null
     override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
 
     suspend override fun send(element: E) {
@@ -47,12 +48,14 @@
         return sendSuspend(element)
     }
 
-    override fun offer(element: E): Boolean =
-        when (offerInternal(element)) {
-            OFFER_SUCCESS -> true
-            OFFER_FAILED -> false
-            else -> throw ClosedSendChannelException()
+    override fun offer(element: E): Boolean {
+        val result = offerInternal(element)
+        return when {
+            result === OFFER_SUCCESS -> true
+            result is Closed<*> -> throw result.sendException
+            else -> false
         }
+    }
 
     private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
         val send = SendElement(cont, element)
@@ -63,13 +66,14 @@
                 return@sc
             }
             // hm... something is not right. try to offer
-            when (offerInternal(element)) {
-                OFFER_SUCCESS -> {
+            val result = offerInternal(element)
+            when {
+                result === OFFER_SUCCESS -> {
                     cont.resume(Unit)
                     return@sc
                 }
-                OFFER_CLOSED -> {
-                    cont.resumeWithException(ClosedSendChannelException())
+                result is Closed<*> -> {
+                    cont.resumeWithException(result.sendException)
                     return@sc
                 }
             }
@@ -82,17 +86,18 @@
         else
             queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
 
-    override fun close() {
+    override fun close(cause: Throwable?): Boolean {
+        val closed = Closed<E>(cause)
         while (true) {
             val receive = takeFirstReceiveOrPeekClosed()
             if (receive == null) {
                 // queue empty or has only senders -- try add last "Closed" item to the queue
-                if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+                if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed<*> })) return true
                 continue // retry on failure
             }
-            if (receive is Closed<*>) return // already marked as closed -- nothing to do
+            if (receive is Closed<*>) return false // already marked as closed -- nothing to do
             receive as Receive<E> // type assertion
-            receive.resumeReceiveClosed()
+            receive.resumeReceiveClosed(closed)
         }
     }
 
@@ -101,20 +106,25 @@
 
     // ------ ReceiveChannel ------
 
-    override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+    override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
     override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
 
     @Suppress("UNCHECKED_CAST")
     suspend override fun receive(): E {
         // fast path -- try poll non-blocking
         val result = pollInternal()
-        if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
-        if (result !== POLL_EMPTY) return result as E
+        if (result !== POLL_EMPTY) return receiveResult(result)
         // slow-path does suspend
         return receiveSuspend()
     }
 
     @Suppress("UNCHECKED_CAST")
+    private fun receiveResult(result: Any?): E {
+        if (result is Closed<*>) throw result.receiveException
+        return result as E
+    }
+
+    @Suppress("UNCHECKED_CAST")
     private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
         val receive = ReceiveNonNull(cont)
         while (true) {
@@ -125,8 +135,8 @@
             }
             // hm... something is not right. try to poll
             val result = pollInternal()
-            if (result === POLL_CLOSED) {
-                cont.resumeWithException(ClosedReceiveChannelException())
+            if (result is Closed<*>) {
+                cont.resumeWithException(result.receiveException)
                 return@sc
             }
             if (result !== POLL_EMPTY) {
@@ -146,13 +156,21 @@
     suspend override fun receiveOrNull(): E? {
         // fast path -- try poll non-blocking
         val result = pollInternal()
-        if (result === POLL_CLOSED) return null
-        if (result !== POLL_EMPTY) return result as E
+        if (result !== POLL_EMPTY) return receiveOrNullResult(result)
         // slow-path does suspend
         return receiveOrNullSuspend()
     }
 
     @Suppress("UNCHECKED_CAST")
+    private fun receiveOrNullResult(result: Any?): E? {
+        if (result is Closed<*>) {
+            if (result.closeCause != null) throw result.receiveException
+            return null
+        }
+        return result as E
+    }
+
+    @Suppress("UNCHECKED_CAST")
     private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
         val receive = ReceiveOrNull(cont)
         while (true) {
@@ -163,8 +181,11 @@
             }
             // hm... something is not right. try to poll
             val result = pollInternal()
-            if (result === POLL_CLOSED) {
-                cont.resume(null)
+            if (result is Closed<*>) {
+                if (result.closeCause == null)
+                    cont.resume(null)
+                else
+                    cont.resumeWithException(result.receiveException)
                 return@sc
             }
             if (result !== POLL_EMPTY) {
@@ -177,7 +198,7 @@
     @Suppress("UNCHECKED_CAST")
     override fun poll(): E? {
         val result = pollInternal()
-        return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+        return if (result === POLL_EMPTY) null else receiveOrNullResult(result)
     }
 
     override fun iterator(): ChannelIterator<E> = Iterator(this)
@@ -186,12 +207,14 @@
         queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
 
     protected companion object {
-        const val OFFER_SUCCESS = 0
-        const val OFFER_FAILED = 1
-        const val OFFER_CLOSED = 2
+        const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+        val OFFER_SUCCESS: Any = Marker("OFFER_SUCCESS")
+        val OFFER_FAILED: Any = Marker("OFFER_FAILED")
 
         val POLL_EMPTY: Any = Marker("POLL_EMPTY")
-        val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+
+        fun isClosed(result: Any?): Boolean = result is Closed<*>
     }
 
     // for debugging
@@ -204,14 +227,22 @@
 
         suspend override fun hasNext(): Boolean {
             // check for repeated hasNext
-            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            if (result !== POLL_EMPTY) return hasNextResult(result)
             // fast path -- try poll non-blocking
             result = channel.pollInternal()
-            if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+            if (result !== POLL_EMPTY) return hasNextResult(result)
             // slow-path does suspend
             return hasNextSuspend()
         }
 
+        private fun hasNextResult(result: Any?): Boolean {
+            if (result is Closed<*>) {
+                if (result.closeCause != null) throw result.receiveException
+                return false
+            }
+            return true
+        }
+
         private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
             val receive = ReceiveHasNext(this, cont)
             while (true) {
@@ -221,9 +252,13 @@
                     return@sc
                 }
                 // hm... something is not right. try to poll
-                result = channel.pollInternal()
-                if (result === POLL_CLOSED) {
-                    cont.resume(false)
+                val result = channel.pollInternal()
+                this.result = result
+                if (result is Closed<*>) {
+                    if (result.closeCause == null)
+                        cont.resume(false)
+                    else
+                        cont.resumeWithException(result.receiveException)
                     return@sc
                 }
                 if (result !== POLL_EMPTY) {
@@ -235,11 +270,11 @@
 
         @Suppress("UNCHECKED_CAST")
         suspend override fun next(): E {
-            if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+            val result = this.result
+            if (result is Closed<*>) throw result.receiveException
             if (result !== POLL_EMPTY) {
-                val value = this.result as E
                 this.result = POLL_EMPTY
-                return value
+                return result as E
             }
             // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
             return channel.receive()
@@ -247,13 +282,13 @@
     }
 
     protected interface Send {
-        val pollResult: Any? // E | POLL_CLOSED
+        val pollResult: Any? // E | Closed
         fun tryResumeSend(): Any?
         fun completeResumeSend(token: Any)
     }
 
     protected interface ReceiveOrClosed<in E> {
-        val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+        val offerResult: Any // OFFER_SUCCESS | Closed
         fun tryResumeReceive(value: E): Any?
         fun completeResumeReceive(token: Any)
     }
@@ -267,30 +302,51 @@
         override fun completeResumeSend(token: Any) = cont.completeResume(token)
     }
 
-    private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
-        override val offerResult get() = OFFER_CLOSED
-        override val pollResult get() = POLL_CLOSED
+    private class Closed<in E>(
+        val closeCause: Throwable?
+    ) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+        @Volatile
+        var _sendException: Throwable? = null
+
+        val sendException: Throwable get() = _sendException ?:
+            (closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE))
+                    .also { _sendException = it }
+
+        @Volatile
+        var _receiveException: Throwable? = null
+
+        val receiveException: Throwable get() = _receiveException ?:
+            (closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE))
+                    .also { _receiveException = it }
+
+        override val offerResult get() = this
+        override val pollResult get() = this
         override fun tryResumeSend(): Boolean = true
         override fun completeResumeSend(token: Any) {}
-        override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
-        override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+        override fun tryResumeReceive(value: E): Any? = throw sendException
+        override fun completeResumeReceive(token: Any) = throw sendException
     }
 
     private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
         override val offerResult get() = OFFER_SUCCESS
-        abstract fun resumeReceiveClosed()
+        abstract fun resumeReceiveClosed(closed: Closed<*>)
     }
 
     private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
         override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
         override fun completeResumeReceive(token: Any) = cont.completeResume(token)
-        override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+        override fun resumeReceiveClosed(closed: Closed<*>) = cont.resumeWithException(closed.receiveException)
     }
 
     private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
         override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
         override fun completeResumeReceive(token: Any) = cont.completeResume(token)
-        override fun resumeReceiveClosed() = cont.resume(null)
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            if (closed.closeCause == null)
+                cont.resume(null)
+            else
+                cont.resumeWithException(closed.receiveException)
+        }
     }
 
     private class ReceiveHasNext<E>(
@@ -305,13 +361,15 @@
 
         override fun completeResumeReceive(token: Any) = cont.completeResume(token)
 
-        override fun resumeReceiveClosed() {
-            val token = cont.tryResume(false)
+        override fun resumeReceiveClosed(closed: Closed<*>) {
+            val token = if (closed.closeCause == null)
+                cont.tryResume(false)
+            else
+                cont.tryResumeWithException(closed.receiveException)
             if (token != null) {
-                iterator.result = POLL_CLOSED
+                iterator.result = closed
                 cont.completeResume(token)
             }
         }
     }
 }
-
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index cff2894..6193c35 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -30,12 +30,13 @@
     override val isBufferEmpty: Boolean get() = size == 0
     override val isBufferFull: Boolean get() = size == capacity
 
-    override fun offerInternal(element: E): Int {
+    // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerInternal(element: E): Any {
         var token: Any? = null
         var receive: ReceiveOrClosed<E>? = null
         locked {
             val size = this.size
-            if (isClosedForSend) return OFFER_CLOSED
+            closedForSend?.let { return it }
             if (size < capacity) {
                 // tentatively put element to buffer
                 this.size = size + 1 // update size before checking queue (!!!)
@@ -61,14 +62,14 @@
         return receive!!.offerResult
     }
 
-    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    // result is `E | POLL_EMPTY | Closed`
     override fun pollInternal(): Any? {
         var token: Any? = null
         var send: Send? = null
         var result: Any? = null
         locked {
             val size = this.size
-            if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+            if (size == 0) return closedForSend ?: POLL_EMPTY
             // size > 0: not empty -- retrieve element
             result = buffer[head]
             buffer[head] = null
@@ -85,7 +86,7 @@
                     }
                 }
             }
-            if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+            if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
                 this.size = size // restore size
                 buffer[(head + size) % capacity] = replacement
             }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index fae4060..7f7ec6d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -4,7 +4,6 @@
 import kotlinx.coroutines.experimental.CoroutineScope
 import kotlinx.coroutines.experimental.Job
 import kotlinx.coroutines.experimental.yield
-import java.util.*
 
 /**
  * Sender's interface to [Channel].
@@ -12,7 +11,9 @@
 public interface SendChannel<in E> {
     /**
      * Returns `true` if this channel was closed by invocation of [close] and thus
-     * the [send] attempt will throw [ClosedSendChannelException].
+     * the [send] attempt throws [ClosedSendChannelException]. If the channel was closed because of the exception, it
+     * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to send
+     * an element to a failed channel throw the original [close] cause exception.
      */
     public val isClosedForSend: Boolean
 
@@ -24,7 +25,8 @@
 
     /**
      * Adds [element] into to this queue, suspending the caller while this queue [isFull],
-     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
+     * It throws the original [close] cause exception if the channel has _failed_.
      *
      * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
      * function is suspended, this function immediately resumes with [CancellationException].
@@ -39,18 +41,24 @@
     /**
      * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
      * and returns `true`. Otherwise, it returns `false` immediately
-     * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+     * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
+     * It throws the original [close] cause exception if the channel has _failed_.
      */
     public fun offer(element: E): Boolean
 
     /**
-     * Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
+     * Closes this channel with an optional exceptional [cause].
+     * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
      * Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
      * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
      * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
      * are received.
+     *
+     * A channel that was closed without a [cause], is considered to be _closed normally_.
+     * A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to send or
+     * receive on a failed channel throw this cause exception.
      */
-    public fun close()
+    public fun close(cause: Throwable? = null): Boolean
 }
 
 /**
@@ -59,8 +67,10 @@
 public interface ReceiveChannel<out E> {
     /**
      * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
-     * side and all previously sent items were already received, so that the [receive] attempt will
-     * throw [ClosedReceiveChannelException].
+     * side and all previously sent items were already received, so that the [receive] attempt
+     * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
+     * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
+     * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
      */
     public val isClosedForReceive: Boolean
 
@@ -73,6 +83,8 @@
     /**
      * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
      * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+     * If the channel was closed because of the exception, it is called a _failed_ channel and this function
+     * throws the original [close][SendChannel.close] cause exception.
      *
      * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
      * function is suspended, this function immediately resumes with [CancellationException].
@@ -86,7 +98,8 @@
 
     /**
      * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
-     * or returns `null` if the channel [isClosedForReceive].
+     * or returns `null` if the channel is [closed][isClosedForReceive] _normally_,
+     * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      *
      * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
      * function is suspended, this function immediately resumes with [CancellationException].
@@ -100,12 +113,15 @@
 
     /**
      * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
-     * or [isClosedForReceive].
+     * or is [closed][isClosedForReceive] _normally_,
+     * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      */
     public fun poll(): E?
 
     /**
      * Returns new iterator to receive elements from this channels using `for` loop.
+     * Iteration completes normally when the channel is [closed][isClosedForReceive] _normally_ and
+     * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      */
     public operator fun iterator(): ChannelIterator<E>
 }
@@ -117,8 +133,10 @@
 public interface ChannelIterator<out E> {
     /**
      * Returns `true` if the channel has more elements suspending the caller while this channel
-     * [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
-     * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
+     * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_.
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+     *
      * This function retrieves and removes the element from this channel for the subsequent invocation
      * of [next].
      *
@@ -136,6 +154,7 @@
      * Retrieves and removes the element from this channel suspending the caller while this channel
      * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
      * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+     * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
      *
      * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
      * function is suspended, this function immediately resumes with [CancellationException].
@@ -172,12 +191,15 @@
 }
 
 /**
- * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
+ * that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on send attempts.
  */
-public class ClosedSendChannelException : IllegalStateException()
+public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
 
 /**
  * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
- * channel.
+ * channel that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on receive attempts.
  */
-public class ClosedReceiveChannelException : NoSuchElementException()
+public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
new file mode 100644
index 0000000..9d49dd2
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
@@ -0,0 +1,57 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Scope for [buildChannel].
+ */
+public interface ChannelBuilder<in E> : CoroutineScope, SendChannel<E>
+
+/**
+ * Return type for [buildChannel].
+ */
+public interface ChannelJob<out E> : Job, ReceiveChannel<E>
+
+/**
+ * Launches new coroutine without blocking current thread to send data over channel
+ * and returns a reference to the coroutine as a [ChannelJob], which implements
+ * both [Job] and [ReceiveChannel].
+ * The scope of the coroutine contains [ChannelBuilder] interface, which implements
+ * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
+ * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
+ * when the coroutine completes.
+ * The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
+ *
+ * The [context] for the new coroutine must be explicitly specified.
+ * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
+ * The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
+ * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ *
+ * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
+ * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ */
+public fun <E> buildChannel(
+    context: CoroutineContext,
+    capacity: Int = 0,
+    block: suspend ChannelBuilder<E>.() -> Unit
+): ChannelJob<E> {
+    val channel = Channel<E>(capacity)
+    return ChannelCoroutine(newCoroutineContext(context), channel).apply {
+        initParentJob(context[Job])
+        block.startCoroutine(this, this)
+    }
+}
+
+private class ChannelCoroutine<E>(
+    context: CoroutineContext,
+    val channel: Channel<E>
+) : AbstractCoroutine<Unit>(context), ChannelBuilder<E>, ChannelJob<E>, Channel<E> by channel {
+    override fun afterCompletion(state: Any?) {
+        val cause = (state as? CompletedExceptionally)?.exception
+        if (!channel.close(cause) && cause != null)
+            handleCoroutineException(context, cause)
+    }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 645228e..fa7e3fb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -13,8 +13,8 @@
     override val isBufferEmpty: Boolean get() = true
     override val isBufferFull: Boolean get() = true
 
-    // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
-    override fun offerInternal(element: E): Int {
+    // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+    override fun offerInternal(element: E): Any {
         while (true) {
             val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
             val token = receive.tryResumeReceive(element)
@@ -25,7 +25,7 @@
         }
     }
 
-    // result is `E | POLL_EMPTY | POLL_CLOSED`
+    // result is `E | POLL_EMPTY | Closed`
     override fun pollInternal(): Any? {
         while (true) {
             val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY