ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 4daf5f1..1b1adad 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -22,13 +22,16 @@
val isCancelled: Boolean
/**
- * Tries to resume this continuation with a given value and returns `true` if it was successful,
- * or `false` otherwise (it was already resumed or cancelled).
- *
- * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
- * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+ * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
+ * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+ * [completeResume] must be invoked with it.
*/
- fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+ fun tryResume(value: T): Any?
+
+ /**
+ * Completes the execution of [tryResume] on its non-null result.
+ */
+ fun completeResume(token: Any)
/**
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@
override val isCancelled: Boolean
get() = getState() is Cancelled
- override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+ override fun tryResume(value: T): Any? {
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (updateState(state, value, onSuccess)) return true
- else -> return false // cannot resume -- not active anymore
+ is Active -> if (tryUpdateState(state, value)) return state
+ else -> return null // cannot resume -- not active anymore
}
}
}
+ override fun completeResume(token: Any) {
+ completeUpdateState(token, getState())
+ }
+
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?) {
val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index d34edc8..9147ede 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,16 +213,24 @@
/**
* Tries to update current [state][getState] of this job.
*/
- fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
+ fun updateState(expect: Any, update: Any?): Boolean {
+ if (!tryUpdateState(expect, update)) return false
+ completeUpdateState(expect, update)
+ return true
+ }
+
+ fun tryUpdateState(expect: Any, update: Any?): Boolean {
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
// #1. Update linked state before invoking completion handlers
onStateUpdate(update)
// #2. Unregister from parent job
registration?.unregister() // volatile read registration _after_ state was updated
- // #3. Additional (optional) callback
- onSuccess?.invoke(update)
- // #4. Invoke completion handlers
+ return true // continues in completeUpdateState
+ }
+
+ fun completeUpdateState(expect: Any, update: Any?) {
+ // #3. Invoke completion handlers
val reason = (update as? CompletedExceptionally)?.cancelReason
var completionException: Throwable? = null
when (expect) {
@@ -244,10 +252,9 @@
// otherwise -- do nothing (Empty)
else -> check(expect == Empty)
}
- // #5. Do other (overridable) processing after completion handlers
+ // #4. Do other (overridable) processing after completion handlers
completionException?.let { handleCompletionException(it) }
afterCompletion(update)
- return true
}
final override val isActive: Boolean get() = state is Active
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..7cd231f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,317 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Abstract channel. It is a base class for buffered and unbuffered channels.
+ */
+public abstract class AbstractChannel<E> : Channel<E> {
+ private val queue = LockFreeLinkedListHead()
+
+ // ------ extension points for buffered channels ------
+
+ protected abstract val hasBuffer: Boolean
+ protected abstract val isBufferEmpty: Boolean
+ protected abstract val isBufferFull: Boolean
+
+ /**
+ * Tries to add element to buffer or to queued receiver.
+ * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+ */
+ protected abstract fun offerInternal(element: E): Int
+
+ /**
+ * Tries to remove element from buffer or from queued sender.
+ * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+ */
+ protected abstract fun pollInternal(): Any?
+
+
+ // ------ state function for concrete implementations ------
+
+ protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+
+ // ------ SendChannel ------
+
+ override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+ override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+
+ suspend override fun send(element: E) {
+ // fast path -- try offer non-blocking
+ if (offer(element)) return
+ // slow-path does suspend
+ return sendSuspend(element)
+ }
+
+ override fun offer(element: E): Boolean =
+ when (offerInternal(element)) {
+ OFFER_SUCCESS -> true
+ OFFER_FAILED -> false
+ else -> throw ClosedSendChannelException()
+ }
+
+ private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
+ val send = SendElement(cont, element)
+ loop@ while (true) {
+ if (enqueueSend(send)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(send)
+ return@sc
+ }
+ // hm... something is not right. try to offer
+ when (offerInternal(element)) {
+ OFFER_SUCCESS -> {
+ cont.resume(Unit)
+ return@sc
+ }
+ OFFER_CLOSED -> {
+ cont.resumeWithException(ClosedSendChannelException())
+ return@sc
+ }
+ }
+ }
+ }
+
+ private fun enqueueSend(send: SendElement) =
+ if (hasBuffer)
+ queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
+ else
+ queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
+
+ override fun close() {
+ while (true) {
+ val receive = takeFirstReceiveOrPeekClosed()
+ if (receive == null) {
+ // queue empty or has only senders -- try add last "Closed" item to the queue
+ if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+ continue // retry on failure
+ }
+ if (receive is Closed<*>) return // already marked as closed -- nothing to do
+ receive as Receive<E> // type assertion
+ receive.resumeReceiveClosed()
+ }
+ }
+
+ protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+ // ------ ReceiveChannel ------
+
+ override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+ override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun receive(): E {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+ if (result !== POLL_EMPTY) return result as E
+ // slow-path does suspend
+ return receiveSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveNonNull(cont)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resumeWithException(ClosedReceiveChannelException())
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ private fun enqueueReceive(receive: Receive<E>) =
+ if (hasBuffer)
+ queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+ else
+ queue.addLastIfPrev(receive, { it !is Send })
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun receiveOrNull(): E? {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result === POLL_CLOSED) return null
+ if (result !== POLL_EMPTY) return result as E
+ // slow-path does suspend
+ return receiveOrNullSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveOrNull(cont)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resume(null)
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun poll(): E? {
+ val result = pollInternal()
+ return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+ }
+
+ override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+ protected fun takeFirstSendOrPeekClosed(): Send? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+ protected companion object {
+ const val OFFER_SUCCESS = 0
+ const val OFFER_FAILED = 1
+ const val OFFER_CLOSED = 2
+
+ val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+ val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+ }
+
+ // for debugging
+ private class Marker(val string: String) {
+ override fun toString(): String = string
+ }
+
+ private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+ var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+
+ suspend override fun hasNext(): Boolean {
+ // check for repeated hasNext
+ if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ // fast path -- try poll non-blocking
+ result = channel.pollInternal()
+ if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ // slow-path does suspend
+ return hasNextSuspend()
+ }
+
+ private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveHasNext(this, cont)
+ while (true) {
+ if (channel.enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ result = channel.pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resume(false)
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(true)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun next(): E {
+ if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+ if (result !== POLL_EMPTY) {
+ val value = this.result as E
+ this.result = POLL_EMPTY
+ return value
+ }
+ // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+ return channel.receive()
+ }
+ }
+
+ protected interface Send {
+ val pollResult: Any? // E | POLL_CLOSED
+ fun tryResumeSend(): Any?
+ fun completeResumeSend(token: Any)
+ }
+
+ protected interface ReceiveOrClosed<in E> {
+ val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+ fun tryResumeReceive(value: E): Any?
+ fun completeResumeReceive(token: Any)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private class SendElement(
+ val cont: CancellableContinuation<Unit>,
+ override val pollResult: Any?
+ ) : LockFreeLinkedListNode(), Send {
+ override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+ override fun completeResumeSend(token: Any) = cont.completeResume(token)
+ }
+
+ private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_CLOSED
+ override val pollResult get() = POLL_CLOSED
+ override fun tryResumeSend(): Boolean = true
+ override fun completeResumeSend(token: Any) {}
+ override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
+ override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+ }
+
+ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_SUCCESS
+ abstract fun resumeReceiveClosed()
+ }
+
+ private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+ }
+
+ private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun resumeReceiveClosed() = cont.resume(null)
+ }
+
+ private class ReceiveHasNext<E>(
+ val iterator: Iterator<E>,
+ val cont: CancellableContinuation<Boolean>
+ ) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? {
+ val token = cont.tryResume(true)
+ if (token != null) iterator.result = value
+ return token
+ }
+
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+
+ override fun resumeReceiveClosed() {
+ val token = cont.tryResume(false)
+ if (token != null) {
+ iterator.result = POLL_CLOSED
+ cont.completeResume(token)
+ }
+ }
+ }
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..cff2894
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,99 @@
+package kotlinx.coroutines.experimental.channels
+
+import java.util.concurrent.locks.ReentrantLock
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
+ init {
+ check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+ }
+
+ private val lock = ReentrantLock()
+ private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+ private var head: Int = 0
+ @Volatile
+ private var size: Int = 0
+
+ private inline fun <T> locked(block: () -> T): T {
+ lock.lock()
+ return try { block() }
+ finally { lock.unlock() }
+ }
+
+ override val hasBuffer: Boolean get() = true
+ override val isBufferEmpty: Boolean get() = size == 0
+ override val isBufferFull: Boolean get() = size == capacity
+
+ override fun offerInternal(element: E): Int {
+ var token: Any? = null
+ var receive: ReceiveOrClosed<E>? = null
+ locked {
+ val size = this.size
+ if (isClosedForSend) return OFFER_CLOSED
+ if (size < capacity) {
+ // tentatively put element to buffer
+ this.size = size + 1 // update size before checking queue (!!!)
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ while (true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
+ token = receive!!.tryResumeReceive(element)
+ if (token != null) {
+ this.size = size // restore size
+ return@locked
+ }
+ }
+ }
+ buffer[(head + size) % capacity] = element // actually queue element
+ return OFFER_SUCCESS
+ }
+ // size == capacity: full
+ return OFFER_FAILED
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(token!!)
+ return receive!!.offerResult
+ }
+
+ // result is `E | POLL_EMPTY | POLL_CLOSED`
+ override fun pollInternal(): Any? {
+ var token: Any? = null
+ var send: Send? = null
+ var result: Any? = null
+ locked {
+ val size = this.size
+ if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+ // size > 0: not empty -- retrieve element
+ result = buffer[head]
+ buffer[head] = null
+ this.size = size - 1 // update size before checking queue (!!!)
+ // check for senders that were waiting on full queue
+ var replacement: Any? = POLL_EMPTY
+ if (size == capacity) {
+ while (true) {
+ send = takeFirstSendOrPeekClosed() ?: break
+ token = send!!.tryResumeSend()
+ if (token != null) {
+ replacement = send!!.pollResult
+ break
+ }
+ }
+ }
+ if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+ this.size = size // restore size
+ buffer[(head + size) % capacity] = replacement
+ }
+ head = (head + 1) % capacity
+ }
+ // complete send the we're taken replacement from
+ if (token != null)
+ send!!.completeResumeSend(token!!)
+ return result
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index ef96c21..fae4060 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -102,7 +102,7 @@
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
* or [isClosedForReceive].
*/
- public fun pool(): E?
+ public fun poll(): E?
/**
* Returns new iterator to receive elements from this channels using `for` loop.
@@ -153,7 +153,23 @@
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
* but it has suspending operations instead of blocking ones and it can be closed.
*/
-public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+ /**
+ * Factory for channels.
+ */
+ public companion object Factory {
+ /**
+ * Creates a channel with specified buffer capacity (or without a buffer by default).
+ */
+ public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
+ check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
+ return if (capacity == 0)
+ RendezvousChannel()
+ else
+ ArrayChannel(capacity)
+ }
+ }
+}
/**
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 57095cc..645228e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -1,283 +1,39 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCompletion
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
-
/**
* Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
* to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
* until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * This implementation is fully lock-free.
*/
-public class RendezvousChannel<E> : Channel<E> {
- private val queue = LockFreeLinkedListHead()
+public class RendezvousChannel<E> : AbstractChannel<E>() {
- // ------ SendChannel ------
+ override val hasBuffer: Boolean get() = false
+ override val isBufferEmpty: Boolean get() = true
+ override val isBufferFull: Boolean get() = true
- override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
-
- override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
-
- suspend override fun send(element: E) {
- // fast path if receive is already waiting for rendezvous
- while (true) { // loop while there are receive waiters
- val receive = takeFirstReceiveOrPeekClosed() ?: break
- if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val send = SendElement(cont, element) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(send) { it !is Receive<*> && it !is Closed<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(send)
- return@sc
- }
- // hm... there are already receivers (maybe), so try taking first
- takeFirstReceiveOrPeekClosed()?.also { receive ->
- if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
- }
- }
- }
- }
-
- override fun offer(element: E): Boolean {
- takeFirstReceiveOrPeekClosed()?.apply {
- tryResumeReceiveFromSend(element)
- return true
- }
- return false
- }
-
- override fun close() {
+ // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
+ override fun offerInternal(element: E): Int {
while (true) {
- val receive = takeFirstReceiveOrPeekClosed()
- if (receive == null) {
- // queue empty or has only senders -- try add last "Closed" item to the queue
- if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
- }
- if (receive is Closed<*>) return // already marked as closed -- nothing to do
- receive as Receive<E> // type assertion
- receive.resumeReceiveClosed()
- }
- }
-
- private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
-
- // ------ ReceiveChannel ------
-
- override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
-
- override val isEmpty: Boolean get() = queue.next() !is Send<*>
-
- suspend override fun receive(): E {
- // fast path if send is already waiting for rendezvous or closed
- while (true) { // loop while there are send waiters
- val send = takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return send.element // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- send.resumeWithElement(cont)
- return@sc
- }
- }
+ val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+ val token = receive.tryResumeReceive(element)
+ if (token != null) {
+ receive.completeResumeReceive(token)
+ return receive.offerResult
}
}
}
- suspend override fun receiveOrNull(): E? {
- // fast path if send is already waiting for rendezvous or closed
- while (true) { // loop while there are send waiters
- val send = takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- send.resumeWithElementOrNull(cont)
- return@sc
- }
- }
- }
- }
- }
-
- override fun pool(): E? {
+ // result is `E | POLL_EMPTY | POLL_CLOSED`
+ override fun pollInternal(): Any? {
while (true) {
- val waiter = takeFirstSendOrPeekClosed() ?: return null
- if (waiter.tryResumeSend()) return waiter.element
- }
- }
-
- override fun iterator(): ChannelIterator<E> = Iterator(this)
-
- private fun takeFirstSendOrPeekClosed(): Send<E>? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
-
- private companion object {
- val IS_UNKNOWN = 0
- val IS_HAS_VALUE = 1
- val IS_CLOSED = 2
- }
-
- private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
- var state: Int = IS_UNKNOWN
- var value: E? = null
-
- suspend override fun hasNext(): Boolean {
- when (state) {
- IS_HAS_VALUE -> return true
- IS_CLOSED -> return false
+ val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
+ val token = send.tryResumeSend()
+ if (token != null) {
+ send.completeResumeSend(token)
+ return send.pollResult
}
- // fast path if send is already waiting for rendezvous
- while (true) { // loop while there are send waiters
- val send = channel.takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return updateStateWithSend(send)
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
- if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- channel.takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- cont.resume(updateStateWithSend(send))
- return@sc
- }
- }
- }
- }
- }
-
- private fun updateStateWithSend(send: Send<E>): Boolean {
- if (send is Closed<*>) {
- state = IS_CLOSED
- return false
- } else {
- state = IS_HAS_VALUE
- value = send.element
- return true
- }
- }
-
- suspend override fun next(): E {
- when (state) {
- IS_HAS_VALUE -> {
- val value = this.value as E
- this.state = IS_UNKNOWN
- this.value = null
- return value
- }
- IS_CLOSED -> throw ClosedReceiveChannelException()
- }
- // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
- return channel.receive()
- }
- }
-
- private abstract class Send<out E> : LockFreeLinkedListNode() {
- abstract val element: E
- abstract val elementOrNull: E?
- abstract fun tryResumeSend(): Boolean
- abstract fun resumeWithElement(cont: CancellableContinuation<E>)
- abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
- }
-
- private class SendElement<out E>(
- val cont: CancellableContinuation<Unit>,
- override val element: E
- ) : Send<E>() {
- override val elementOrNull: E? get() = element
- override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
- override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
- override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
- }
-
- private interface ReceiveOrClosed<in E> {
- fun tryResumeReceiveFromSend(value: E): Boolean
- fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
- }
-
- private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
- override val element: E get() = throw ClosedReceiveChannelException()
- override val elementOrNull: E? get() = null
- override fun tryResumeSend(): Boolean = true
- override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
- override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
- override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
- override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
- cont.resumeWithException(ClosedSendChannelException())
- return true
- }
- }
-
- private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
- override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
- if (tryResumeReceiveFromSend(value)) {
- cont.resume(Unit)
- return true
- }
- return false
- }
- abstract fun resumeReceiveClosed()
- }
-
- private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
- override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
- override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
- }
-
- private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
- override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
- override fun resumeReceiveClosed() = cont.resume(null)
- }
-
- private class ReceiveHasNext<E>(
- val iterator: Iterator<E>,
- val cont: CancellableContinuation<Boolean>
- ) : Receive<E>(), (Any?) -> Unit {
- override fun tryResumeReceiveFromSend(value: E): Boolean {
- iterator.value = value // tentative value (may fail to resume with it)
- return cont.tryResume(true, this)
- }
-
- override fun resumeReceiveClosed() {
- cont.tryResume(false, this)
- }
-
- // callback for tryResume onSuccess
- override fun invoke(hasNext: Any?) {
- hasNext as Boolean // try assertion -- that is the value we pass
- iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
- if (!hasNext) iterator.value = null // cleanup tentative value
}
}
}