Basic Channel interfaces and RendezvousChannel implementation
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 64d815b..4daf5f1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -20,15 +20,37 @@
* Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
*/
val isCancelled: Boolean
+
+ /**
+ * Tries to resume this continuation with a given value and returns `true` if it was successful,
+ * or `false` otherwise (it was already resumed or cancelled).
+ *
+ * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
+ * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+ */
+ fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+
+ /**
+ * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
+ * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
+ */
+ fun initCancellability()
}
/**
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
+ *
+ * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
+ * cancellable until [CancellableContinuation.initCancellability] is invoked.
*/
-public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
+public inline suspend fun <T> suspendCancellableCoroutine(
+ holdCancellability: Boolean = false,
+ crossinline block: (CancellableContinuation<T>) -> Unit
+): T =
suspendCoroutineOrReturn { cont ->
val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
+ if (!holdCancellability) safe.initCancellability()
block(safe)
safe.getResult()
}
@@ -63,7 +85,9 @@
const val YIELD = 3 // used by cancellable "yield"
}
- init { initParentJob(parentJob) }
+ override fun initCancellability() {
+ initParentJob(parentJob)
+ }
fun getResult(): Any? {
val decision = this.decision // volatile read
@@ -80,6 +104,16 @@
override val isCancelled: Boolean
get() = getState() is Cancelled
+ override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+ while (true) { // lock-free loop on state
+ val state = getState() // atomic read
+ when (state) {
+ is Active -> if (updateState(state, value, onSuccess)) return true
+ else -> return false // cannot resume -- not active anymore
+ }
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?) {
val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 8d767b1..d34edc8 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,14 +213,16 @@
/**
* Tries to update current [state][getState] of this job.
*/
- fun updateState(expect: Any, update: Any?): Boolean {
+ fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
// #1. Update linked state before invoking completion handlers
onStateUpdate(update)
// #2. Unregister from parent job
registration?.unregister() // volatile read registration _after_ state was updated
- // #3. Invoke completion handlers
+ // #3. Additional (optional) callback
+ onSuccess?.invoke(update)
+ // #4. Invoke completion handlers
val reason = (update as? CompletedExceptionally)?.cancelReason
var completionException: Throwable? = null
when (expect) {
@@ -242,7 +244,7 @@
// otherwise -- do nothing (Empty)
else -> check(expect == Empty)
}
- // #4. Do other (overridable) processing after completion handlers
+ // #5. Do other (overridable) processing after completion handlers
completionException?.let { handleCompletionException(it) }
afterCompletion(update)
return true
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
new file mode 100644
index 0000000..ef96c21
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -0,0 +1,167 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.yield
+import java.util.*
+
+/**
+ * Sender's interface to [Channel].
+ */
+public interface SendChannel<in E> {
+ /**
+ * Returns `true` if this channel was closed by invocation of [close] and thus
+ * the [send] attempt will throw [ClosedSendChannelException].
+ */
+ public val isClosedForSend: Boolean
+
+ /**
+ * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
+ * This function returns `false` for [isClosedForSend] channel.
+ */
+ public val isFull: Boolean
+
+ /**
+ * Adds [element] into to this queue, suspending the caller while this queue [isFull],
+ * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended send is *atomic* -- when this function
+ * throws [CancellationException] it means that the [element] was not sent to this channel.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend fun send(element: E)
+
+ /**
+ * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
+ * and returns `true`. Otherwise, it returns `false` immediately
+ * or throws [ClosedSendChannelException] if the channel [isClosedForSend].
+ */
+ public fun offer(element: E): Boolean
+
+ /**
+ * Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
+ * Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
+ * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
+ * are received.
+ */
+ public fun close()
+}
+
+/**
+ * Receiver's interface to [Channel].
+ */
+public interface ReceiveChannel<out E> {
+ /**
+ * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
+ * side and all previously sent items were already received, so that the [receive] attempt will
+ * throw [ClosedReceiveChannelException].
+ */
+ public val isClosedForReceive: Boolean
+
+ /**
+ * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
+ * This function returns `false` for [isClosedForReceive] channel.
+ */
+ public val isEmpty: Boolean
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended receive is *atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend fun receive(): E
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or returns `null` if the channel [isClosedForReceive].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended receive is *atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend fun receiveOrNull(): E?
+
+ /**
+ * Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
+ * or [isClosedForReceive].
+ */
+ public fun pool(): E?
+
+ /**
+ * Returns new iterator to receive elements from this channels using `for` loop.
+ */
+ public operator fun iterator(): ChannelIterator<E>
+}
+
+/**
+ * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
+ * from concurrent coroutines.
+ */
+public interface ChannelIterator<out E> {
+ /**
+ * Returns `true` if the channel has more elements suspending the caller while this channel
+ * [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+ * This function retrieves and removes the element from this channel for the subsequent invocation
+ * of [next].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended receive is *atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend operator fun hasNext(): Boolean
+
+ /**
+ * Retrieves and removes the element from this channel suspending the caller while this channel
+ * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended receive is *atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend operator fun next(): E
+}
+
+/**
+ * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
+ * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
+ * but it has suspending operations instead of blocking ones and it can be closed.
+ */
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+
+/**
+ * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
+ */
+public class ClosedSendChannelException : IllegalStateException()
+
+/**
+ * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
+ * channel.
+ */
+public class ClosedReceiveChannelException : NoSuchElementException()
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
new file mode 100644
index 0000000..57095cc
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -0,0 +1,284 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
+ * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
+ * until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ */
+public class RendezvousChannel<E> : Channel<E> {
+ private val queue = LockFreeLinkedListHead()
+
+ // ------ SendChannel ------
+
+ override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+
+ override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
+
+ suspend override fun send(element: E) {
+ // fast path if receive is already waiting for rendezvous
+ while (true) { // loop while there are receive waiters
+ val receive = takeFirstReceiveOrPeekClosed() ?: break
+ if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
+ }
+ // slow-path does suspend
+ return suspendCancellableCoroutine(true) sc@ { cont ->
+ while (true) {
+ val send = SendElement(cont, element) // must allocate fresh element on each loop
+ if (queue.addLastIfPrev(send) { it !is Receive<*> && it !is Closed<*> }) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(send)
+ return@sc
+ }
+ // hm... there are already receivers (maybe), so try taking first
+ takeFirstReceiveOrPeekClosed()?.also { receive ->
+ if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
+ }
+ }
+ }
+ }
+
+ override fun offer(element: E): Boolean {
+ takeFirstReceiveOrPeekClosed()?.apply {
+ tryResumeReceiveFromSend(element)
+ return true
+ }
+ return false
+ }
+
+ override fun close() {
+ while (true) {
+ val receive = takeFirstReceiveOrPeekClosed()
+ if (receive == null) {
+ // queue empty or has only senders -- try add last "Closed" item to the queue
+ if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
+ }
+ if (receive is Closed<*>) return // already marked as closed -- nothing to do
+ receive as Receive<E> // type assertion
+ receive.resumeReceiveClosed()
+ }
+ }
+
+ private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+ // ------ ReceiveChannel ------
+
+ override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
+
+ override val isEmpty: Boolean get() = queue.next() !is Send<*>
+
+ suspend override fun receive(): E {
+ // fast path if send is already waiting for rendezvous or closed
+ while (true) { // loop while there are send waiters
+ val send = takeFirstSendOrPeekClosed() ?: break
+ if (send.tryResumeSend()) return send.element // resumed it successfully
+ }
+ // slow-path does suspend
+ return suspendCancellableCoroutine(true) sc@ { cont ->
+ while (true) {
+ val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
+ if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... there are already senders (maybe), so try taking first
+ takeFirstSendOrPeekClosed()?.also { send ->
+ if (send.tryResumeSend()) {
+ send.resumeWithElement(cont)
+ return@sc
+ }
+ }
+ }
+ }
+ }
+
+ suspend override fun receiveOrNull(): E? {
+ // fast path if send is already waiting for rendezvous or closed
+ while (true) { // loop while there are send waiters
+ val send = takeFirstSendOrPeekClosed() ?: break
+ if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
+ }
+ // slow-path does suspend
+ return suspendCancellableCoroutine(true) sc@ { cont ->
+ while (true) {
+ val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
+ if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... there are already senders (maybe), so try taking first
+ takeFirstSendOrPeekClosed()?.also { send ->
+ if (send.tryResumeSend()) {
+ send.resumeWithElementOrNull(cont)
+ return@sc
+ }
+ }
+ }
+ }
+ }
+
+ override fun pool(): E? {
+ while (true) {
+ val waiter = takeFirstSendOrPeekClosed() ?: return null
+ if (waiter.tryResumeSend()) return waiter.element
+ }
+ }
+
+ override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+ private fun takeFirstSendOrPeekClosed(): Send<E>? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
+
+ private companion object {
+ val IS_UNKNOWN = 0
+ val IS_HAS_VALUE = 1
+ val IS_CLOSED = 2
+ }
+
+ private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
+ var state: Int = IS_UNKNOWN
+ var value: E? = null
+
+ suspend override fun hasNext(): Boolean {
+ when (state) {
+ IS_HAS_VALUE -> return true
+ IS_CLOSED -> return false
+ }
+ // fast path if send is already waiting for rendezvous
+ while (true) { // loop while there are send waiters
+ val send = channel.takeFirstSendOrPeekClosed() ?: break
+ if (send.tryResumeSend()) return updateStateWithSend(send)
+ }
+ // slow-path does suspend
+ return suspendCancellableCoroutine(true) sc@ { cont ->
+ while (true) {
+ val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
+ if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... there are already senders (maybe), so try taking first
+ channel.takeFirstSendOrPeekClosed()?.also { send ->
+ if (send.tryResumeSend()) {
+ cont.resume(updateStateWithSend(send))
+ return@sc
+ }
+ }
+ }
+ }
+ }
+
+ private fun updateStateWithSend(send: Send<E>): Boolean {
+ if (send is Closed<*>) {
+ state = IS_CLOSED
+ return false
+ } else {
+ state = IS_HAS_VALUE
+ value = send.element
+ return true
+ }
+ }
+
+ suspend override fun next(): E {
+ when (state) {
+ IS_HAS_VALUE -> {
+ val value = this.value as E
+ this.state = IS_UNKNOWN
+ this.value = null
+ return value
+ }
+ IS_CLOSED -> throw ClosedReceiveChannelException()
+ }
+ // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
+ return channel.receive()
+ }
+ }
+
+ private abstract class Send<out E> : LockFreeLinkedListNode() {
+ abstract val element: E
+ abstract val elementOrNull: E?
+ abstract fun tryResumeSend(): Boolean
+ abstract fun resumeWithElement(cont: CancellableContinuation<E>)
+ abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
+ }
+
+ private class SendElement<out E>(
+ val cont: CancellableContinuation<Unit>,
+ override val element: E
+ ) : Send<E>() {
+ override val elementOrNull: E? get() = element
+ override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
+ override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
+ override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
+ }
+
+ private interface ReceiveOrClosed<in E> {
+ fun tryResumeReceiveFromSend(value: E): Boolean
+ fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
+ }
+
+ private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
+ override val element: E get() = throw ClosedReceiveChannelException()
+ override val elementOrNull: E? get() = null
+ override fun tryResumeSend(): Boolean = true
+ override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
+ override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
+ override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
+ override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+ cont.resumeWithException(ClosedSendChannelException())
+ return true
+ }
+ }
+
+ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+ override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
+ if (tryResumeReceiveFromSend(value)) {
+ cont.resume(Unit)
+ return true
+ }
+ return false
+ }
+ abstract fun resumeReceiveClosed()
+ }
+
+ private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+ override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+ override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+ }
+
+ private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+ override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
+ override fun resumeReceiveClosed() = cont.resume(null)
+ }
+
+ private class ReceiveHasNext<E>(
+ val iterator: Iterator<E>,
+ val cont: CancellableContinuation<Boolean>
+ ) : Receive<E>(), (Any?) -> Unit {
+ override fun tryResumeReceiveFromSend(value: E): Boolean {
+ iterator.value = value // tentative value (may fail to resume with it)
+ return cont.tryResume(true, this)
+ }
+
+ override fun resumeReceiveClosed() {
+ cont.tryResume(false, this)
+ }
+
+ // callback for tryResume onSuccess
+ override fun invoke(hasNext: Any?) {
+ hasNext as Boolean // try assertion -- that is the value we pass
+ iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
+ if (!hasNext) iterator.value = null // cleanup tentative value
+ }
+ }
+}
+