Support failed channels (closed for cause), buildChannel coroutine builder
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 9a87ca5..db8e986 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -73,7 +73,7 @@
) : AbstractCoroutine<Unit>(parentContext) {
override fun afterCompletion(state: Any?) {
// note the use of the parent's job context below!
- if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
+ if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelCause)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 1b1adad..18af647 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -29,7 +29,14 @@
fun tryResume(value: T): Any?
/**
- * Completes the execution of [tryResume] on its non-null result.
+ * Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
+ * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+ * [completeResume] must be invoked with it.
+ */
+ fun tryResumeWithException(exception: Throwable): Any?
+
+ /**
+ * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
*/
fun completeResume(token: Any)
@@ -117,6 +124,16 @@
}
}
+ override fun tryResumeWithException(exception: Throwable): Any? {
+ while (true) { // lock-free loop on state
+ val state = getState() // atomic read
+ when (state) {
+ is Active -> if (tryUpdateState(state, Failed(exception))) return state
+ else -> return null // cannot resume -- not active anymore
+ }
+ }
+ }
+
override fun completeResume(token: Any) {
completeUpdateState(token, getState())
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 9147ede..5040d34 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -57,7 +57,7 @@
public fun onCompletion(handler: CompletionHandler): Registration
/**
- * Cancel this activity with an optional cancellation [reason]. The result is `true` if this job was
+ * Cancel this activity with an optional cancellation [cause]. The result is `true` if this job was
* cancelled as a result of this invocation and `false` otherwise
* (if it was already cancelled or it is [NonCancellable]).
* Repeated invocation of this function has no effect and always produces `false`.
@@ -66,7 +66,7 @@
* at the corresponding original cancellation site and passed into this method to aid in debugging by providing
* both the context of cancellation and text description of the reason.
*/
- public fun cancel(reason: Throwable? = null): Boolean
+ public fun cancel(cause: Throwable? = null): Boolean
@Suppress("DeprecatedCallableAddReplaceWith")
@Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
@@ -231,7 +231,7 @@
fun completeUpdateState(expect: Any, update: Any?) {
// #3. Invoke completion handlers
- val reason = (update as? CompletedExceptionally)?.cancelReason
+ val reason = (update as? CompletedExceptionally)?.cancelCause
var completionException: Throwable? = null
when (expect) {
// SINGLE/SINGLE+ state -- one completion handler (common case)
@@ -295,7 +295,7 @@
}
// is not active anymore
else -> {
- handler((state as? Cancelled)?.cancelReason)
+ handler((state as? Cancelled)?.cancelCause)
return EmptyRegistration
}
}
@@ -327,10 +327,10 @@
}
}
- final override fun cancel(reason: Throwable?): Boolean {
+ final override fun cancel(cause: Throwable?): Boolean {
while (true) { // lock-free loop on state
val state = this.state as? Active ?: return false // quit if not active anymore
- if (updateState(state, Cancelled(reason))) return true
+ if (updateState(state, Cancelled(cause))) return true
}
}
@@ -371,31 +371,31 @@
* Abstract class for a [state][getState] of a job that had completed exceptionally, including cancellation.
*/
internal abstract class CompletedExceptionally {
- abstract val cancelReason: Throwable // original reason or fresh CancellationException
+ abstract val cancelCause: Throwable // original reason or fresh CancellationException
abstract val exception: Throwable // the exception to be thrown in continuation
- // convert cancelReason to CancellationException on first need
+ // convert cancelCause to CancellationException on first need
@Volatile
private var _cancellationException: CancellationException? = null
val cancellationException: CancellationException get() =
_cancellationException ?: // atomic read volatile var or else build new
- (cancelReason as? CancellationException ?:
- CancellationException(cancelReason.message).apply { initCause(cancelReason) })
- .also { _cancellationException = it }
-
+ (cancelCause as? CancellationException ?:
+ CancellationException(cancelCause.message)
+ .apply { initCause(cancelCause) })
+ .also { _cancellationException = it }
}
/**
* Represents a [state][getState] of a cancelled job.
*/
- internal class Cancelled(specifiedReason: Throwable?) : CompletedExceptionally() {
+ internal class Cancelled(specifiedCause: Throwable?) : CompletedExceptionally() {
@Volatile
- private var _cancelReason = specifiedReason // materialize CancellationException on first need
+ private var _cancelCause = specifiedCause // materialize CancellationException on first need
- override val cancelReason: Throwable get() =
- _cancelReason ?: // atomic read volatile var or else create new
- CancellationException("Job was cancelled without specified reason").also { _cancelReason = it }
+ override val cancelCause: Throwable get() =
+ _cancelCause ?: // atomic read volatile var or else create new
+ CancellationException("Job was cancelled").also { _cancelCause = it }
override val exception: Throwable get() = cancellationException
}
@@ -404,7 +404,7 @@
* Represents a [state][getState] of a failed job.
*/
internal class Failed(override val exception: Throwable) : CompletedExceptionally() {
- override val cancelReason: Throwable get() = exception
+ override val cancelCause: Throwable get() = exception
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index 5d253e6..ba54f35 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -16,5 +16,5 @@
override val isActive: Boolean get() = true
override fun getInactiveCancellationException(): CancellationException = throw IllegalStateException("This job is always active")
override fun onCompletion(handler: CompletionHandler): Job.Registration = EmptyRegistration
- override fun cancel(reason: Throwable?): Boolean = false
+ override fun cancel(cause: Throwable?): Boolean = false
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 7cd231f..7978fd1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -20,24 +20,25 @@
/**
* Tries to add element to buffer or to queued receiver.
- * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+ * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
*/
- protected abstract fun offerInternal(element: E): Int
+ protected abstract fun offerInternal(element: E): Any
/**
* Tries to remove element from buffer or from queued sender.
- * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+ * Return type is `E | POLL_EMPTY | Closed`
*/
protected abstract fun pollInternal(): Any?
- // ------ state function for concrete implementations ------
+ // ------ state functions for concrete implementations ------
- protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+ protected val closedForReceive: Any? get() = queue.next() as? Closed<*>
+ protected val closedForSend: Any? get() = queue.prev() as? Closed<*>
// ------ SendChannel ------
- override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+ override val isClosedForSend: Boolean get() = closedForSend != null
override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
suspend override fun send(element: E) {
@@ -47,12 +48,14 @@
return sendSuspend(element)
}
- override fun offer(element: E): Boolean =
- when (offerInternal(element)) {
- OFFER_SUCCESS -> true
- OFFER_FAILED -> false
- else -> throw ClosedSendChannelException()
+ override fun offer(element: E): Boolean {
+ val result = offerInternal(element)
+ return when {
+ result === OFFER_SUCCESS -> true
+ result is Closed<*> -> throw result.sendException
+ else -> false
}
+ }
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
val send = SendElement(cont, element)
@@ -63,13 +66,14 @@
return@sc
}
// hm... something is not right. try to offer
- when (offerInternal(element)) {
- OFFER_SUCCESS -> {
+ val result = offerInternal(element)
+ when {
+ result === OFFER_SUCCESS -> {
cont.resume(Unit)
return@sc
}
- OFFER_CLOSED -> {
- cont.resumeWithException(ClosedSendChannelException())
+ result is Closed<*> -> {
+ cont.resumeWithException(result.sendException)
return@sc
}
}
@@ -82,17 +86,18 @@
else
queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
- override fun close() {
+ override fun close(cause: Throwable?): Boolean {
+ val closed = Closed<E>(cause)
while (true) {
val receive = takeFirstReceiveOrPeekClosed()
if (receive == null) {
// queue empty or has only senders -- try add last "Closed" item to the queue
- if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+ if (queue.addLastIfPrev(closed, { it !is ReceiveOrClosed<*> })) return true
continue // retry on failure
}
- if (receive is Closed<*>) return // already marked as closed -- nothing to do
+ if (receive is Closed<*>) return false // already marked as closed -- nothing to do
receive as Receive<E> // type assertion
- receive.resumeReceiveClosed()
+ receive.resumeReceiveClosed(closed)
}
}
@@ -101,20 +106,25 @@
// ------ ReceiveChannel ------
- override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+ override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
@Suppress("UNCHECKED_CAST")
suspend override fun receive(): E {
// fast path -- try poll non-blocking
val result = pollInternal()
- if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
- if (result !== POLL_EMPTY) return result as E
+ if (result !== POLL_EMPTY) return receiveResult(result)
// slow-path does suspend
return receiveSuspend()
}
@Suppress("UNCHECKED_CAST")
+ private fun receiveResult(result: Any?): E {
+ if (result is Closed<*>) throw result.receiveException
+ return result as E
+ }
+
+ @Suppress("UNCHECKED_CAST")
private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
val receive = ReceiveNonNull(cont)
while (true) {
@@ -125,8 +135,8 @@
}
// hm... something is not right. try to poll
val result = pollInternal()
- if (result === POLL_CLOSED) {
- cont.resumeWithException(ClosedReceiveChannelException())
+ if (result is Closed<*>) {
+ cont.resumeWithException(result.receiveException)
return@sc
}
if (result !== POLL_EMPTY) {
@@ -146,13 +156,21 @@
suspend override fun receiveOrNull(): E? {
// fast path -- try poll non-blocking
val result = pollInternal()
- if (result === POLL_CLOSED) return null
- if (result !== POLL_EMPTY) return result as E
+ if (result !== POLL_EMPTY) return receiveOrNullResult(result)
// slow-path does suspend
return receiveOrNullSuspend()
}
@Suppress("UNCHECKED_CAST")
+ private fun receiveOrNullResult(result: Any?): E? {
+ if (result is Closed<*>) {
+ if (result.closeCause != null) throw result.receiveException
+ return null
+ }
+ return result as E
+ }
+
+ @Suppress("UNCHECKED_CAST")
private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
val receive = ReceiveOrNull(cont)
while (true) {
@@ -163,8 +181,11 @@
}
// hm... something is not right. try to poll
val result = pollInternal()
- if (result === POLL_CLOSED) {
- cont.resume(null)
+ if (result is Closed<*>) {
+ if (result.closeCause == null)
+ cont.resume(null)
+ else
+ cont.resumeWithException(result.receiveException)
return@sc
}
if (result !== POLL_EMPTY) {
@@ -177,7 +198,7 @@
@Suppress("UNCHECKED_CAST")
override fun poll(): E? {
val result = pollInternal()
- return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+ return if (result === POLL_EMPTY) null else receiveOrNullResult(result)
}
override fun iterator(): ChannelIterator<E> = Iterator(this)
@@ -186,12 +207,14 @@
queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
protected companion object {
- const val OFFER_SUCCESS = 0
- const val OFFER_FAILED = 1
- const val OFFER_CLOSED = 2
+ const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+ val OFFER_SUCCESS: Any = Marker("OFFER_SUCCESS")
+ val OFFER_FAILED: Any = Marker("OFFER_FAILED")
val POLL_EMPTY: Any = Marker("POLL_EMPTY")
- val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+
+ fun isClosed(result: Any?): Boolean = result is Closed<*>
}
// for debugging
@@ -204,14 +227,22 @@
suspend override fun hasNext(): Boolean {
// check for repeated hasNext
- if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ if (result !== POLL_EMPTY) return hasNextResult(result)
// fast path -- try poll non-blocking
result = channel.pollInternal()
- if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ if (result !== POLL_EMPTY) return hasNextResult(result)
// slow-path does suspend
return hasNextSuspend()
}
+ private fun hasNextResult(result: Any?): Boolean {
+ if (result is Closed<*>) {
+ if (result.closeCause != null) throw result.receiveException
+ return false
+ }
+ return true
+ }
+
private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
@@ -221,9 +252,13 @@
return@sc
}
// hm... something is not right. try to poll
- result = channel.pollInternal()
- if (result === POLL_CLOSED) {
- cont.resume(false)
+ val result = channel.pollInternal()
+ this.result = result
+ if (result is Closed<*>) {
+ if (result.closeCause == null)
+ cont.resume(false)
+ else
+ cont.resumeWithException(result.receiveException)
return@sc
}
if (result !== POLL_EMPTY) {
@@ -235,11 +270,11 @@
@Suppress("UNCHECKED_CAST")
suspend override fun next(): E {
- if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+ val result = this.result
+ if (result is Closed<*>) throw result.receiveException
if (result !== POLL_EMPTY) {
- val value = this.result as E
this.result = POLL_EMPTY
- return value
+ return result as E
}
// rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
return channel.receive()
@@ -247,13 +282,13 @@
}
protected interface Send {
- val pollResult: Any? // E | POLL_CLOSED
+ val pollResult: Any? // E | Closed
fun tryResumeSend(): Any?
fun completeResumeSend(token: Any)
}
protected interface ReceiveOrClosed<in E> {
- val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+ val offerResult: Any // OFFER_SUCCESS | Closed
fun tryResumeReceive(value: E): Any?
fun completeResumeReceive(token: Any)
}
@@ -267,30 +302,51 @@
override fun completeResumeSend(token: Any) = cont.completeResume(token)
}
- private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
- override val offerResult get() = OFFER_CLOSED
- override val pollResult get() = POLL_CLOSED
+ private class Closed<in E>(
+ val closeCause: Throwable?
+ ) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+ @Volatile
+ var _sendException: Throwable? = null
+
+ val sendException: Throwable get() = _sendException ?:
+ (closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE))
+ .also { _sendException = it }
+
+ @Volatile
+ var _receiveException: Throwable? = null
+
+ val receiveException: Throwable get() = _receiveException ?:
+ (closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE))
+ .also { _receiveException = it }
+
+ override val offerResult get() = this
+ override val pollResult get() = this
override fun tryResumeSend(): Boolean = true
override fun completeResumeSend(token: Any) {}
- override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
- override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+ override fun tryResumeReceive(value: E): Any? = throw sendException
+ override fun completeResumeReceive(token: Any) = throw sendException
}
private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
override val offerResult get() = OFFER_SUCCESS
- abstract fun resumeReceiveClosed()
+ abstract fun resumeReceiveClosed(closed: Closed<*>)
}
private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
- override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+ override fun resumeReceiveClosed(closed: Closed<*>) = cont.resumeWithException(closed.receiveException)
}
private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
- override fun resumeReceiveClosed() = cont.resume(null)
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ if (closed.closeCause == null)
+ cont.resume(null)
+ else
+ cont.resumeWithException(closed.receiveException)
+ }
}
private class ReceiveHasNext<E>(
@@ -305,13 +361,15 @@
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
- override fun resumeReceiveClosed() {
- val token = cont.tryResume(false)
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ val token = if (closed.closeCause == null)
+ cont.tryResume(false)
+ else
+ cont.tryResumeWithException(closed.receiveException)
if (token != null) {
- iterator.result = POLL_CLOSED
+ iterator.result = closed
cont.completeResume(token)
}
}
}
}
-
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index cff2894..6193c35 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -30,12 +30,13 @@
override val isBufferEmpty: Boolean get() = size == 0
override val isBufferFull: Boolean get() = size == capacity
- override fun offerInternal(element: E): Int {
+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerInternal(element: E): Any {
var token: Any? = null
var receive: ReceiveOrClosed<E>? = null
locked {
val size = this.size
- if (isClosedForSend) return OFFER_CLOSED
+ closedForSend?.let { return it }
if (size < capacity) {
// tentatively put element to buffer
this.size = size + 1 // update size before checking queue (!!!)
@@ -61,14 +62,14 @@
return receive!!.offerResult
}
- // result is `E | POLL_EMPTY | POLL_CLOSED`
+ // result is `E | POLL_EMPTY | Closed`
override fun pollInternal(): Any? {
var token: Any? = null
var send: Send? = null
var result: Any? = null
locked {
val size = this.size
- if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+ if (size == 0) return closedForSend ?: POLL_EMPTY
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
@@ -85,7 +86,7 @@
}
}
}
- if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+ if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
this.size = size // restore size
buffer[(head + size) % capacity] = replacement
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index fae4060..7f7ec6d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -4,7 +4,6 @@
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.yield
-import java.util.*
/**
* Sender's interface to [Channel].
@@ -12,7 +11,9 @@
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by invocation of [close] and thus
- * the [send] attempt will throw [ClosedSendChannelException].
+ * the [send] attempt throws [ClosedSendChannelException]. If the channel was closed because of the exception, it
+ * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to send
+ * an element to a failed channel throw the original [close] cause exception.
*/
public val isClosedForSend: Boolean
@@ -24,7 +25,8 @@
/**
* Adds [element] into to this queue, suspending the caller while this queue [isFull],
- * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+ * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
+ * It throws the original [close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
@@ -39,18 +41,24 @@
/**
* Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
* and returns `true`. Otherwise, it returns `false` immediately
- * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+ * or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
+ * It throws the original [close] cause exception if the channel has _failed_.
*/
public fun offer(element: E): Boolean
/**
- * Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
+ * Closes this channel with an optional exceptional [cause].
+ * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
+ *
+ * A channel that was closed without a [cause], is considered to be _closed normally_.
+ * A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to send or
+ * receive on a failed channel throw this cause exception.
*/
- public fun close()
+ public fun close(cause: Throwable? = null): Boolean
}
/**
@@ -59,8 +67,10 @@
public interface ReceiveChannel<out E> {
/**
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
- * side and all previously sent items were already received, so that the [receive] attempt will
- * throw [ClosedReceiveChannelException].
+ * side and all previously sent items were already received, so that the [receive] attempt
+ * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
+ * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
+ * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
*/
public val isClosedForReceive: Boolean
@@ -73,6 +83,8 @@
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+ * If the channel was closed because of the exception, it is called a _failed_ channel and this function
+ * throws the original [close][SendChannel.close] cause exception.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
@@ -86,7 +98,8 @@
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
- * or returns `null` if the channel [isClosedForReceive].
+ * or returns `null` if the channel is [closed][isClosedForReceive] _normally_,
+ * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
@@ -100,12 +113,15 @@
/**
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
- * or [isClosedForReceive].
+ * or is [closed][isClosedForReceive] _normally_,
+ * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun poll(): E?
/**
* Returns new iterator to receive elements from this channels using `for` loop.
+ * Iteration completes normally when the channel is [closed][isClosedForReceive] _normally_ and
+ * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
}
@@ -117,8 +133,10 @@
public interface ChannelIterator<out E> {
/**
* Returns `true` if the channel has more elements suspending the caller while this channel
- * [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
- * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+ * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_.
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
* This function retrieves and removes the element from this channel for the subsequent invocation
* of [next].
*
@@ -136,6 +154,7 @@
* Retrieves and removes the element from this channel suspending the caller while this channel
* [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+ * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
@@ -172,12 +191,15 @@
}
/**
- * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
+ * that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on send attempts.
*/
-public class ClosedSendChannelException : IllegalStateException()
+public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
/**
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
- * channel.
+ * channel that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
+ * exception on receive attempts.
*/
-public class ClosedReceiveChannelException : NoSuchElementException()
+public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
new file mode 100644
index 0000000..9d49dd2
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelBuilder.kt
@@ -0,0 +1,57 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * Scope for [buildChannel].
+ */
+public interface ChannelBuilder<in E> : CoroutineScope, SendChannel<E>
+
+/**
+ * Return type for [buildChannel].
+ */
+public interface ChannelJob<out E> : Job, ReceiveChannel<E>
+
+/**
+ * Launches new coroutine without blocking current thread to send data over channel
+ * and returns a reference to the coroutine as a [ChannelJob], which implements
+ * both [Job] and [ReceiveChannel].
+ * The scope of the coroutine contains [ChannelBuilder] interface, which implements
+ * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
+ * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
+ * when the coroutine completes.
+ * The running coroutine is cancelled when the its job is [cancelled][Job.cancel].
+ *
+ * The [context] for the new coroutine must be explicitly specified.
+ * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
+ * The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
+ * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ *
+ * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
+ * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ */
+public fun <E> buildChannel(
+ context: CoroutineContext,
+ capacity: Int = 0,
+ block: suspend ChannelBuilder<E>.() -> Unit
+): ChannelJob<E> {
+ val channel = Channel<E>(capacity)
+ return ChannelCoroutine(newCoroutineContext(context), channel).apply {
+ initParentJob(context[Job])
+ block.startCoroutine(this, this)
+ }
+}
+
+private class ChannelCoroutine<E>(
+ context: CoroutineContext,
+ val channel: Channel<E>
+) : AbstractCoroutine<Unit>(context), ChannelBuilder<E>, ChannelJob<E>, Channel<E> by channel {
+ override fun afterCompletion(state: Any?) {
+ val cause = (state as? CompletedExceptionally)?.exception
+ if (!channel.close(cause) && cause != null)
+ handleCoroutineException(context, cause)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 645228e..fa7e3fb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -13,8 +13,8 @@
override val isBufferEmpty: Boolean get() = true
override val isBufferFull: Boolean get() = true
- // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
- override fun offerInternal(element: E): Int {
+ // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
+ override fun offerInternal(element: E): Any {
while (true) {
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
val token = receive.tryResumeReceive(element)
@@ -25,7 +25,7 @@
}
}
- // result is `E | POLL_EMPTY | POLL_CLOSED`
+ // result is `E | POLL_EMPTY | Closed`
override fun pollInternal(): Any? {
while (true) {
val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY