ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 4daf5f1..1b1adad 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -22,13 +22,16 @@
val isCancelled: Boolean
/**
- * Tries to resume this continuation with a given value and returns `true` if it was successful,
- * or `false` otherwise (it was already resumed or cancelled).
- *
- * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
- * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+ * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
+ * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+ * [completeResume] must be invoked with it.
*/
- fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+ fun tryResume(value: T): Any?
+
+ /**
+ * Completes the execution of [tryResume] on its non-null result.
+ */
+ fun completeResume(token: Any)
/**
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@
override val isCancelled: Boolean
get() = getState() is Cancelled
- override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+ override fun tryResume(value: T): Any? {
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (updateState(state, value, onSuccess)) return true
- else -> return false // cannot resume -- not active anymore
+ is Active -> if (tryUpdateState(state, value)) return state
+ else -> return null // cannot resume -- not active anymore
}
}
}
+ override fun completeResume(token: Any) {
+ completeUpdateState(token, getState())
+ }
+
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?) {
val decision = this.decision // volatile read
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index d34edc8..9147ede 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -213,16 +213,24 @@
/**
* Tries to update current [state][getState] of this job.
*/
- fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
+ fun updateState(expect: Any, update: Any?): Boolean {
+ if (!tryUpdateState(expect, update)) return false
+ completeUpdateState(expect, update)
+ return true
+ }
+
+ fun tryUpdateState(expect: Any, update: Any?): Boolean {
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
if (!STATE.compareAndSet(this, expect, update)) return false
// #1. Update linked state before invoking completion handlers
onStateUpdate(update)
// #2. Unregister from parent job
registration?.unregister() // volatile read registration _after_ state was updated
- // #3. Additional (optional) callback
- onSuccess?.invoke(update)
- // #4. Invoke completion handlers
+ return true // continues in completeUpdateState
+ }
+
+ fun completeUpdateState(expect: Any, update: Any?) {
+ // #3. Invoke completion handlers
val reason = (update as? CompletedExceptionally)?.cancelReason
var completionException: Throwable? = null
when (expect) {
@@ -244,10 +252,9 @@
// otherwise -- do nothing (Empty)
else -> check(expect == Empty)
}
- // #5. Do other (overridable) processing after completion handlers
+ // #4. Do other (overridable) processing after completion handlers
completionException?.let { handleCompletionException(it) }
afterCompletion(update)
- return true
}
final override val isActive: Boolean get() = state is Active
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
new file mode 100644
index 0000000..7cd231f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -0,0 +1,317 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CancellableContinuation
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.removeOnCompletion
+import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+
+/**
+ * Abstract channel. It is a base class for buffered and unbuffered channels.
+ */
+public abstract class AbstractChannel<E> : Channel<E> {
+ private val queue = LockFreeLinkedListHead()
+
+ // ------ extension points for buffered channels ------
+
+ protected abstract val hasBuffer: Boolean
+ protected abstract val isBufferEmpty: Boolean
+ protected abstract val isBufferFull: Boolean
+
+ /**
+ * Tries to add element to buffer or to queued receiver.
+ * Return type is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`.
+ */
+ protected abstract fun offerInternal(element: E): Int
+
+ /**
+ * Tries to remove element from buffer or from queued sender.
+ * Return type is `E | POLL_EMPTY | POLL_CLOSED`
+ */
+ protected abstract fun pollInternal(): Any?
+
+
+ // ------ state function for concrete implementations ------
+
+ protected val isClosedTokenFirstInQueue: Boolean get() = queue.next() is Closed<*>
+
+ // ------ SendChannel ------
+
+ override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
+ override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*> && isBufferFull
+
+ suspend override fun send(element: E) {
+ // fast path -- try offer non-blocking
+ if (offer(element)) return
+ // slow-path does suspend
+ return sendSuspend(element)
+ }
+
+ override fun offer(element: E): Boolean =
+ when (offerInternal(element)) {
+ OFFER_SUCCESS -> true
+ OFFER_FAILED -> false
+ else -> throw ClosedSendChannelException()
+ }
+
+ private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
+ val send = SendElement(cont, element)
+ loop@ while (true) {
+ if (enqueueSend(send)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(send)
+ return@sc
+ }
+ // hm... something is not right. try to offer
+ when (offerInternal(element)) {
+ OFFER_SUCCESS -> {
+ cont.resume(Unit)
+ return@sc
+ }
+ OFFER_CLOSED -> {
+ cont.resumeWithException(ClosedSendChannelException())
+ return@sc
+ }
+ }
+ }
+ }
+
+ private fun enqueueSend(send: SendElement) =
+ if (hasBuffer)
+ queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
+ else
+ queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
+
+ override fun close() {
+ while (true) {
+ val receive = takeFirstReceiveOrPeekClosed()
+ if (receive == null) {
+ // queue empty or has only senders -- try add last "Closed" item to the queue
+ if (queue.addLastIfPrev(Closed<E>(), { it !is ReceiveOrClosed<*> })) return
+ continue // retry on failure
+ }
+ if (receive is Closed<*>) return // already marked as closed -- nothing to do
+ receive as Receive<E> // type assertion
+ receive.resumeReceiveClosed()
+ }
+ }
+
+ protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+
+ // ------ ReceiveChannel ------
+
+ override val isClosedForReceive: Boolean get() = isClosedTokenFirstInQueue && isBufferEmpty
+ override val isEmpty: Boolean get() = queue.next() !is Send && isBufferEmpty
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun receive(): E {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+ if (result !== POLL_EMPTY) return result as E
+ // slow-path does suspend
+ return receiveSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveNonNull(cont)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resumeWithException(ClosedReceiveChannelException())
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ private fun enqueueReceive(receive: Receive<E>) =
+ if (hasBuffer)
+ queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
+ else
+ queue.addLastIfPrev(receive, { it !is Send })
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun receiveOrNull(): E? {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result === POLL_CLOSED) return null
+ if (result !== POLL_EMPTY) return result as E
+ // slow-path does suspend
+ return receiveOrNullSuspend()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveOrNull(cont)
+ while (true) {
+ if (enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ val result = pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resume(null)
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(result as E)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun poll(): E? {
+ val result = pollInternal()
+ return if (result === POLL_EMPTY || result === POLL_CLOSED) null else result as E
+ }
+
+ override fun iterator(): ChannelIterator<E> = Iterator(this)
+
+ protected fun takeFirstSendOrPeekClosed(): Send? =
+ queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
+
+ protected companion object {
+ const val OFFER_SUCCESS = 0
+ const val OFFER_FAILED = 1
+ const val OFFER_CLOSED = 2
+
+ val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+ val POLL_CLOSED: Any = Marker("POLL_CLOSED")
+ }
+
+ // for debugging
+ private class Marker(val string: String) {
+ override fun toString(): String = string
+ }
+
+ private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
+ var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+
+ suspend override fun hasNext(): Boolean {
+ // check for repeated hasNext
+ if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ // fast path -- try poll non-blocking
+ result = channel.pollInternal()
+ if (result !== POLL_EMPTY) return result !== POLL_CLOSED
+ // slow-path does suspend
+ return hasNextSuspend()
+ }
+
+ private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutine(true) sc@ { cont ->
+ val receive = ReceiveHasNext(this, cont)
+ while (true) {
+ if (channel.enqueueReceive(receive)) {
+ cont.initCancellability() // make it properly cancellable
+ cont.removeOnCompletion(receive)
+ return@sc
+ }
+ // hm... something is not right. try to poll
+ result = channel.pollInternal()
+ if (result === POLL_CLOSED) {
+ cont.resume(false)
+ return@sc
+ }
+ if (result !== POLL_EMPTY) {
+ cont.resume(true)
+ return@sc
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun next(): E {
+ if (result === POLL_CLOSED) throw ClosedReceiveChannelException()
+ if (result !== POLL_EMPTY) {
+ val value = this.result as E
+ this.result = POLL_EMPTY
+ return value
+ }
+ // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
+ return channel.receive()
+ }
+ }
+
+ protected interface Send {
+ val pollResult: Any? // E | POLL_CLOSED
+ fun tryResumeSend(): Any?
+ fun completeResumeSend(token: Any)
+ }
+
+ protected interface ReceiveOrClosed<in E> {
+ val offerResult: Int // OFFER_SUCCESS | OFFER_CLOSED
+ fun tryResumeReceive(value: E): Any?
+ fun completeResumeReceive(token: Any)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private class SendElement(
+ val cont: CancellableContinuation<Unit>,
+ override val pollResult: Any?
+ ) : LockFreeLinkedListNode(), Send {
+ override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+ override fun completeResumeSend(token: Any) = cont.completeResume(token)
+ }
+
+ private class Closed<in E> : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_CLOSED
+ override val pollResult get() = POLL_CLOSED
+ override fun tryResumeSend(): Boolean = true
+ override fun completeResumeSend(token: Any) {}
+ override fun tryResumeReceive(value: E): Any? = throw ClosedSendChannelException()
+ override fun completeResumeReceive(token: Any) = throw ClosedSendChannelException()
+ }
+
+ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
+ override val offerResult get() = OFFER_SUCCESS
+ abstract fun resumeReceiveClosed()
+ }
+
+ private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
+ }
+
+ private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun resumeReceiveClosed() = cont.resume(null)
+ }
+
+ private class ReceiveHasNext<E>(
+ val iterator: Iterator<E>,
+ val cont: CancellableContinuation<Boolean>
+ ) : Receive<E>() {
+ override fun tryResumeReceive(value: E): Any? {
+ val token = cont.tryResume(true)
+ if (token != null) iterator.result = value
+ return token
+ }
+
+ override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+
+ override fun resumeReceiveClosed() {
+ val token = cont.tryResume(false)
+ if (token != null) {
+ iterator.result = POLL_CLOSED
+ cont.completeResume(token)
+ }
+ }
+ }
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
new file mode 100644
index 0000000..cff2894
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -0,0 +1,99 @@
+package kotlinx.coroutines.experimental.channels
+
+import java.util.concurrent.locks.ReentrantLock
+
+/**
+ * Channel with array buffer of a fixed [capacity].
+ * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
+ *
+ * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
+ * The lists of suspended senders or receivers are lock-free.
+ */
+public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
+ init {
+ check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
+ }
+
+ private val lock = ReentrantLock()
+ private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
+ private var head: Int = 0
+ @Volatile
+ private var size: Int = 0
+
+ private inline fun <T> locked(block: () -> T): T {
+ lock.lock()
+ return try { block() }
+ finally { lock.unlock() }
+ }
+
+ override val hasBuffer: Boolean get() = true
+ override val isBufferEmpty: Boolean get() = size == 0
+ override val isBufferFull: Boolean get() = size == capacity
+
+ override fun offerInternal(element: E): Int {
+ var token: Any? = null
+ var receive: ReceiveOrClosed<E>? = null
+ locked {
+ val size = this.size
+ if (isClosedForSend) return OFFER_CLOSED
+ if (size < capacity) {
+ // tentatively put element to buffer
+ this.size = size + 1 // update size before checking queue (!!!)
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ while (true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
+ token = receive!!.tryResumeReceive(element)
+ if (token != null) {
+ this.size = size // restore size
+ return@locked
+ }
+ }
+ }
+ buffer[(head + size) % capacity] = element // actually queue element
+ return OFFER_SUCCESS
+ }
+ // size == capacity: full
+ return OFFER_FAILED
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(token!!)
+ return receive!!.offerResult
+ }
+
+ // result is `E | POLL_EMPTY | POLL_CLOSED`
+ override fun pollInternal(): Any? {
+ var token: Any? = null
+ var send: Send? = null
+ var result: Any? = null
+ locked {
+ val size = this.size
+ if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
+ // size > 0: not empty -- retrieve element
+ result = buffer[head]
+ buffer[head] = null
+ this.size = size - 1 // update size before checking queue (!!!)
+ // check for senders that were waiting on full queue
+ var replacement: Any? = POLL_EMPTY
+ if (size == capacity) {
+ while (true) {
+ send = takeFirstSendOrPeekClosed() ?: break
+ token = send!!.tryResumeSend()
+ if (token != null) {
+ replacement = send!!.pollResult
+ break
+ }
+ }
+ }
+ if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
+ this.size = size // restore size
+ buffer[(head + size) % capacity] = replacement
+ }
+ head = (head + 1) % capacity
+ }
+ // complete send the we're taken replacement from
+ if (token != null)
+ send!!.completeResumeSend(token!!)
+ return result
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index ef96c21..fae4060 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -102,7 +102,7 @@
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
* or [isClosedForReceive].
*/
- public fun pool(): E?
+ public fun poll(): E?
/**
* Returns new iterator to receive elements from this channels using `for` loop.
@@ -153,7 +153,23 @@
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
* but it has suspending operations instead of blocking ones and it can be closed.
*/
-public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
+public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
+ /**
+ * Factory for channels.
+ */
+ public companion object Factory {
+ /**
+ * Creates a channel with specified buffer capacity (or without a buffer by default).
+ */
+ public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
+ check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
+ return if (capacity == 0)
+ RendezvousChannel()
+ else
+ ArrayChannel(capacity)
+ }
+ }
+}
/**
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index 57095cc..645228e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -1,283 +1,39 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCompletion
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
-
/**
* Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
* to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
* until another coroutine invokes [receive] and [receive] suspends until another coroutine invokes [send].
+ *
+ * This implementation is fully lock-free.
*/
-public class RendezvousChannel<E> : Channel<E> {
- private val queue = LockFreeLinkedListHead()
+public class RendezvousChannel<E> : AbstractChannel<E>() {
- // ------ SendChannel ------
+ override val hasBuffer: Boolean get() = false
+ override val isBufferEmpty: Boolean get() = true
+ override val isBufferFull: Boolean get() = true
- override val isClosedForSend: Boolean get() = queue.prev() is Closed<*>
-
- override val isFull: Boolean get() = queue.next() !is ReceiveOrClosed<*>
-
- suspend override fun send(element: E) {
- // fast path if receive is already waiting for rendezvous
- while (true) { // loop while there are receive waiters
- val receive = takeFirstReceiveOrPeekClosed() ?: break
- if (receive.tryResumeReceiveFromSend(element)) return // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val send = SendElement(cont, element) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(send) { it !is Receive<*> && it !is Closed<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(send)
- return@sc
- }
- // hm... there are already receivers (maybe), so try taking first
- takeFirstReceiveOrPeekClosed()?.also { receive ->
- if (receive.tryResumeReceiveFromSend(element, cont)) return@sc
- }
- }
- }
- }
-
- override fun offer(element: E): Boolean {
- takeFirstReceiveOrPeekClosed()?.apply {
- tryResumeReceiveFromSend(element)
- return true
- }
- return false
- }
-
- override fun close() {
+ // result is `OFFER_SUCCESS | OFFER_FAILED | OFFER_CLOSED`
+ override fun offerInternal(element: E): Int {
while (true) {
- val receive = takeFirstReceiveOrPeekClosed()
- if (receive == null) {
- // queue empty or has only senders -- try add last "Closed" item to the queue
- if (queue.addLastIfPrev(Closed<E>()) { it !is Closed<*> }) return
- }
- if (receive is Closed<*>) return // already marked as closed -- nothing to do
- receive as Receive<E> // type assertion
- receive.resumeReceiveClosed()
- }
- }
-
- private fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
-
- // ------ ReceiveChannel ------
-
- override val isClosedForReceive: Boolean get() = queue.next() is Closed<*>
-
- override val isEmpty: Boolean get() = queue.next() !is Send<*>
-
- suspend override fun receive(): E {
- // fast path if send is already waiting for rendezvous or closed
- while (true) { // loop while there are send waiters
- val send = takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return send.element // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveNonNull(cont) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- send.resumeWithElement(cont)
- return@sc
- }
- }
+ val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
+ val token = receive.tryResumeReceive(element)
+ if (token != null) {
+ receive.completeResumeReceive(token)
+ return receive.offerResult
}
}
}
- suspend override fun receiveOrNull(): E? {
- // fast path if send is already waiting for rendezvous or closed
- while (true) { // loop while there are send waiters
- val send = takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return send.elementOrNull // resumed it successfully
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveOrNull(cont) // must allocate fresh element on each loop
- if (queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- send.resumeWithElementOrNull(cont)
- return@sc
- }
- }
- }
- }
- }
-
- override fun pool(): E? {
+ // result is `E | POLL_EMPTY | POLL_CLOSED`
+ override fun pollInternal(): Any? {
while (true) {
- val waiter = takeFirstSendOrPeekClosed() ?: return null
- if (waiter.tryResumeSend()) return waiter.element
- }
- }
-
- override fun iterator(): ChannelIterator<E> = Iterator(this)
-
- private fun takeFirstSendOrPeekClosed(): Send<E>? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<Send<E>> { it is Closed<*> }
-
- private companion object {
- val IS_UNKNOWN = 0
- val IS_HAS_VALUE = 1
- val IS_CLOSED = 2
- }
-
- private class Iterator<E>(val channel: RendezvousChannel<E>) : ChannelIterator<E> {
- var state: Int = IS_UNKNOWN
- var value: E? = null
-
- suspend override fun hasNext(): Boolean {
- when (state) {
- IS_HAS_VALUE -> return true
- IS_CLOSED -> return false
+ val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
+ val token = send.tryResumeSend()
+ if (token != null) {
+ send.completeResumeSend(token)
+ return send.pollResult
}
- // fast path if send is already waiting for rendezvous
- while (true) { // loop while there are send waiters
- val send = channel.takeFirstSendOrPeekClosed() ?: break
- if (send.tryResumeSend()) return updateStateWithSend(send)
- }
- // slow-path does suspend
- return suspendCancellableCoroutine(true) sc@ { cont ->
- while (true) {
- val receive = ReceiveHasNext(this, cont) // must allocate fresh element on each loop
- if (channel.queue.addLastIfPrev(receive) { it !is Send<*> }) {
- cont.initCancellability() // make it properly cancellable
- cont.removeOnCompletion(receive)
- return@sc
- }
- // hm... there are already senders (maybe), so try taking first
- channel.takeFirstSendOrPeekClosed()?.also { send ->
- if (send.tryResumeSend()) {
- cont.resume(updateStateWithSend(send))
- return@sc
- }
- }
- }
- }
- }
-
- private fun updateStateWithSend(send: Send<E>): Boolean {
- if (send is Closed<*>) {
- state = IS_CLOSED
- return false
- } else {
- state = IS_HAS_VALUE
- value = send.element
- return true
- }
- }
-
- suspend override fun next(): E {
- when (state) {
- IS_HAS_VALUE -> {
- val value = this.value as E
- this.state = IS_UNKNOWN
- this.value = null
- return value
- }
- IS_CLOSED -> throw ClosedReceiveChannelException()
- }
- // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as IS_UNKNOWN)
- return channel.receive()
- }
- }
-
- private abstract class Send<out E> : LockFreeLinkedListNode() {
- abstract val element: E
- abstract val elementOrNull: E?
- abstract fun tryResumeSend(): Boolean
- abstract fun resumeWithElement(cont: CancellableContinuation<E>)
- abstract fun resumeWithElementOrNull(cont: CancellableContinuation<E?>)
- }
-
- private class SendElement<out E>(
- val cont: CancellableContinuation<Unit>,
- override val element: E
- ) : Send<E>() {
- override val elementOrNull: E? get() = element
- override fun tryResumeSend(): Boolean = cont.tryResume(Unit)
- override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resume(element)
- override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(element)
- }
-
- private interface ReceiveOrClosed<in E> {
- fun tryResumeReceiveFromSend(value: E): Boolean
- fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean
- }
-
- private class Closed<E> : Send<E>(), ReceiveOrClosed<E> {
- override val element: E get() = throw ClosedReceiveChannelException()
- override val elementOrNull: E? get() = null
- override fun tryResumeSend(): Boolean = true
- override fun resumeWithElement(cont: CancellableContinuation<E>) = cont.resumeWithException(ClosedReceiveChannelException())
- override fun resumeWithElementOrNull(cont: CancellableContinuation<E?>) = cont.resume(null)
- override fun tryResumeReceiveFromSend(value: E): Boolean = throw ClosedSendChannelException()
- override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
- cont.resumeWithException(ClosedSendChannelException())
- return true
- }
- }
-
- private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
- override fun tryResumeReceiveFromSend(value: E, cont: CancellableContinuation<Unit>): Boolean {
- if (tryResumeReceiveFromSend(value)) {
- cont.resume(Unit)
- return true
- }
- return false
- }
- abstract fun resumeReceiveClosed()
- }
-
- private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
- override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
- override fun resumeReceiveClosed() = cont.resumeWithException(ClosedReceiveChannelException())
- }
-
- private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
- override fun tryResumeReceiveFromSend(value: E): Boolean = cont.tryResume(value)
- override fun resumeReceiveClosed() = cont.resume(null)
- }
-
- private class ReceiveHasNext<E>(
- val iterator: Iterator<E>,
- val cont: CancellableContinuation<Boolean>
- ) : Receive<E>(), (Any?) -> Unit {
- override fun tryResumeReceiveFromSend(value: E): Boolean {
- iterator.value = value // tentative value (may fail to resume with it)
- return cont.tryResume(true, this)
- }
-
- override fun resumeReceiveClosed() {
- cont.tryResume(false, this)
- }
-
- // callback for tryResume onSuccess
- override fun invoke(hasNext: Any?) {
- hasNext as Boolean // try assertion -- that is the value we pass
- iterator.state = if (hasNext) IS_HAS_VALUE else IS_CLOSED
- if (!hasNext) iterator.value = null // cleanup tentative value
}
}
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
new file mode 100644
index 0000000..2f837e5
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -0,0 +1,133 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class ArrayChannelTest : TestBase() {
+ @Test
+ fun testSimple() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull)
+ expect(1)
+ val sender = launch(context) {
+ expect(4)
+ q.send(1) // success -- buffered
+ check(!q.isEmpty && q.isFull)
+ expect(5)
+ q.send(2) // suspends (buffer full)
+ expect(9)
+ }
+ expect(2)
+ val receiver = launch(context) {
+ expect(6)
+ check(q.receive() == 1) // does not suspend -- took from buffer
+ check(!q.isEmpty && q.isFull) // waiting sender's element moved to buffer
+ expect(7)
+ check(q.receive() == 2) // does not suspend (takes from sender)
+ expect(8)
+ }
+ expect(3)
+ sender.join()
+ receiver.join()
+ check(q.isEmpty && !q.isFull)
+ finish(10)
+ }
+
+ @Test
+ fun testStress() = runBlocking {
+ val n = 100_000
+ val q = ArrayChannel<Int>(1)
+ val sender = launch(context) {
+ for (i in 1..n) q.send(i)
+ expect(2)
+ }
+ val receiver = launch(context) {
+ for (i in 1..n) check(q.receive() == i)
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+
+ @Test
+ fun testClosedBufferedReceiveOrNull() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
+ expect(1)
+ launch(context) {
+ expect(5)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ assertEquals(42, q.receiveOrNull())
+ expect(6)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ assertEquals(null, q.receiveOrNull())
+ expect(7)
+ }
+ expect(2)
+ q.send(42) // buffers
+ expect(3)
+ q.close() // goes on
+ expect(4)
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && !q.isClosedForReceive)
+ yield()
+ check(!q.isEmpty && !q.isFull && q.isClosedForSend && q.isClosedForReceive)
+ finish(8)
+ }
+
+ @Test
+ fun testClosedExceptions() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ expect(1)
+ launch(context) {
+ expect(4)
+ try { q.receive() }
+ catch (e: ClosedReceiveChannelException) {
+ expect(5)
+ }
+ }
+ expect(2)
+ q.close()
+ expect(3)
+ yield()
+ expect(6)
+ try { q.send(42) }
+ catch (e: ClosedSendChannelException) {
+ finish(7)
+ }
+ }
+
+ @Test
+ fun testOfferAndPool() = runBlocking {
+ val q = ArrayChannel<Int>(1)
+ assertTrue(q.offer(1))
+ expect(1)
+ launch(context) {
+ expect(3)
+ assertEquals(1, q.poll())
+ expect(4)
+ assertEquals(null, q.poll())
+ expect(5)
+ assertEquals(2, q.receive()) // suspends
+ expect(9)
+ assertEquals(3, q.poll())
+ expect(10)
+ assertEquals(null, q.poll())
+ expect(11)
+ }
+ expect(2)
+ yield()
+ expect(6)
+ assertTrue(q.offer(2))
+ expect(7)
+ assertTrue(q.offer(3))
+ expect(8)
+ assertFalse(q.offer(4))
+ yield()
+ finish(12)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
similarity index 86%
rename from kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
rename to kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index d444a5d..a395ce4 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelAtomicityStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -2,13 +2,22 @@
import kotlinx.coroutines.experimental.*
import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
import java.util.*
import kotlin.test.assertEquals
-class RendezvousChannelAtomicityStressTest {
+@RunWith(Parameterized::class)
+class ChannelAtomicCancelStressTest(val kind: TestChannelKind) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
+ }
+
val TEST_DURATION = 3000L
- val channel = RendezvousChannel<Int>()
+ val channel = kind.create()
val senderDone = RendezvousChannel<Boolean>()
val receiverDone = RendezvousChannel<Boolean>()
@@ -25,7 +34,7 @@
lateinit var receiver: Job
@Test
- fun testStress() = runBlocking {
+ fun testAtomicCancelStress() = runBlocking {
val deadline = System.currentTimeMillis() + TEST_DURATION
launchSender()
launchReceiver()
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
new file mode 100644
index 0000000..32a33ca
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -0,0 +1,106 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.test.assertEquals
+import kotlin.test.assertTrue
+
+
+@RunWith(Parameterized::class)
+class ChannelSendReceiveStressTest(
+ val kind: TestChannelKind,
+ val nSenders: Int,
+ val nReceivers: Int
+) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> =
+ listOf(1, 2, 10).flatMap { nSenders ->
+ listOf(1, 6).flatMap { nReceivers ->
+ TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
+ }
+ }
+ }
+
+ val nEvents = 1_000_000
+
+ val channel = kind.create()
+ val sendersCompleted = AtomicInteger()
+ val receiversCompleted = AtomicInteger()
+ val dupes = AtomicInteger()
+ val received = ConcurrentHashMap<Int,Int>()
+ val receivedBy = IntArray(nReceivers)
+
+ @Test
+ fun testSendReceiveStress() = runBlocking {
+ val receivers = List(nReceivers) { receiverIndex ->
+ // different event receivers use different code
+ launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
+ when (receiverIndex % 3) {
+ 0 -> doReceive(receiverIndex)
+ 1 -> doReceiveOrNull(receiverIndex)
+ 2 -> doIterator(receiverIndex)
+ }
+ receiversCompleted.incrementAndGet()
+ }
+ }
+ val senders = List(nSenders) { senderIndex ->
+ launch(CommonPool + CoroutineName("sender$senderIndex")) {
+ for (i in senderIndex until nEvents step nSenders)
+ channel.send(i)
+ sendersCompleted.incrementAndGet()
+ }
+ }
+ senders.forEach { it.join() }
+ channel.close()
+ receivers.forEach { it.join() }
+ println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
+ println("Completed successfully ${sendersCompleted.get()} sender coroutines")
+ println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
+ println(" Received ${received.size} events")
+ println(" Received dupes ${dupes.get()}")
+ repeat(nReceivers) { receiveIndex ->
+ println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
+ }
+ assertEquals(nSenders, sendersCompleted.get())
+ assertEquals(nReceivers, receiversCompleted.get())
+ assertEquals(0, dupes.get())
+ assertEquals(nEvents, received.size)
+ repeat(nReceivers) { receiveIndex ->
+ assertTrue(receivedBy[receiveIndex] > 0, "Each receiver should have received something")
+ }
+ }
+
+ private fun doReceived(receiverIndex: Int, event: Int) {
+ if (received.put(event, event) != null) {
+ println("Duplicate event $event")
+ dupes.incrementAndGet()
+ }
+ receivedBy[receiverIndex]++
+ }
+
+ private suspend fun doReceive(receiverIndex: Int) {
+ while (true) {
+ try { doReceived(receiverIndex, channel.receive()) }
+ catch (ex: ClosedReceiveChannelException) { break }
+ }
+
+ }
+
+ private suspend fun doReceiveOrNull(receiverIndex: Int) {
+ while (true) {
+ doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ }
+ }
+
+ private suspend fun doIterator(receiverIndex: Int) {
+ for (event in channel) {
+ doReceived(receiverIndex, event)
+ }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
deleted file mode 100644
index a8269c3..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelMultipleReceiversStressTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-package kotlinx.coroutines.experimental.channels
-
-import kotlinx.coroutines.experimental.CommonPool
-import kotlinx.coroutines.experimental.join
-import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Test
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.test.assertEquals
-import kotlin.test.assertTrue
-
-class RendezvousChannelMultipleReceiversStressTest {
- val nEvents = 1_000_000
- val nReceivers = 6
-
- val channel = RendezvousChannel<Int>()
- val completedSuccessfully = AtomicInteger()
- val dupes = AtomicInteger()
- val received = ConcurrentHashMap<Int,Int>()
- val receivedBy = IntArray(nReceivers)
-
- @Test
- fun testStress() = runBlocking {
- val receivers = List(nReceivers) { receiverIndex ->
- // different event receivers use different code
- launch(CommonPool) {
- when (receiverIndex % 3) {
- 0 -> doReceive(receiverIndex)
- 1 -> doReceiveOrNull(receiverIndex)
- 2 -> doIterator(receiverIndex)
- }
- completedSuccessfully.incrementAndGet()
- }
- }
- repeat(nEvents) {
- channel.send(it)
- }
- channel.close()
- receivers.forEach { it.join() }
- println("Completed successfully ${completedSuccessfully.get()} coroutines")
- println(" Received ${received.size} events")
- println(" Received dupes ${dupes.get()}")
- repeat(nReceivers) { receiveIndex ->
- println(" Received by #$receiveIndex ${receivedBy[receiveIndex]}")
- }
- assertEquals(nReceivers, completedSuccessfully.get())
- assertEquals(0, dupes.get())
- assertEquals(nEvents, received.size)
- repeat(nReceivers) { receiveIndex ->
- assertTrue(receivedBy[receiveIndex] > nEvents / nReceivers / 2, "Should be balanced")
- }
- }
-
- private fun doReceived(event: Int) {
- if (received.put(event, event) != null) {
- println("Duplicate event $event")
- dupes.incrementAndGet()
- }
- }
-
- private suspend fun doReceive(receiverIndex: Int) {
- while (true) {
- try { doReceived(channel.receive()) }
- catch (ex: ClosedReceiveChannelException) { break }
- receivedBy[receiverIndex]++
- }
-
- }
-
- private suspend fun doReceiveOrNull(receiverIndex: Int) {
- while (true) {
- doReceived(channel.receiveOrNull() ?: break)
- receivedBy[receiverIndex]++
- }
- }
-
- private suspend fun doIterator(receiverIndex: Int) {
- for (event in channel) {
- doReceived(event)
- receivedBy[receiverIndex]++
- }
- }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index 669607e..741f5a9 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -103,14 +103,14 @@
expect(1)
launch(context) {
expect(3)
- assertEquals(null, q.pool())
+ assertEquals(null, q.poll())
expect(4)
assertEquals(2, q.receive())
expect(7)
- assertEquals(null, q.pool())
+ assertEquals(null, q.poll())
yield()
expect(9)
- assertEquals(3, q.pool())
+ assertEquals(3, q.poll())
expect(10)
}
expect(2)
@@ -123,4 +123,110 @@
q.send(3)
finish(11)
}
+
+ @Test
+ fun testIteratorClosed() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.close()
+ expect(4)
+ }
+ expect(2)
+ for (x in q) {
+ expectUnreached()
+ }
+ finish(5)
+ }
+
+ @Test
+ fun testIteratorOne() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.close()
+ expect(5)
+ }
+ expect(2)
+ for (x in q) {
+ expect(6)
+ assertEquals(1, x)
+ }
+ finish(7)
+ }
+
+ @Test
+ fun testIteratorOneWithYield() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.close()
+ expect(7)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ expect(5)
+ assertEquals(1, x)
+ }
+ finish(8)
+ }
+
+ @Test
+ fun testIteratorTwo() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1)
+ expect(4)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(6)
+ else -> expectUnreached()
+ }
+ }
+ finish(9)
+ }
+
+ @Test
+ fun testIteratorTwoWithYield() = runBlocking {
+ val q = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ q.send(1) // will suspend
+ expect(6)
+ q.send(2)
+ expect(7)
+ q.close()
+ expect(8)
+ }
+ expect(2)
+ yield() // yield to sender coroutine right before starting for loop
+ expect(4)
+ for (x in q) {
+ when (x) {
+ 1 -> expect(5)
+ 2 -> expect(9)
+ else -> expectUnreached()
+ }
+ }
+ finish(10)
+ }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..1464151
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,40 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.CommonPool
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import kotlin.test.assertEquals
+
+@RunWith(Parameterized::class)
+class SimpleSendReceiveTest(
+ val kind: TestChannelKind,
+ val n: Int
+) {
+ companion object {
+ @Parameterized.Parameters(name = "{0}, n={1}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = TestChannelKind.values().flatMap { kind ->
+ listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100, 1000).map { n ->
+ arrayOf<Any>(kind, n)
+ }
+ }
+ }
+
+ val channel = kind.create()
+
+ @Test
+ fun testSimpleSendReceive() = runBlocking {
+ launch(CommonPool) {
+ repeat(n) { channel.send(it) }
+ channel.close()
+ }
+ var received = 0
+ for (x in channel) {
+ assertEquals(received++, x)
+ }
+ assertEquals(n, received)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
new file mode 100644
index 0000000..e5533f4
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -0,0 +1,19 @@
+package kotlinx.coroutines.experimental.channels
+
+enum class TestChannelKind {
+ RENDEZVOUS {
+ override fun create(): Channel<Int> = RendezvousChannel<Int>()
+ override fun toString(): String = "RendezvousChannel"
+ },
+ ARRAY_1 {
+ override fun create(): Channel<Int> = ArrayChannel<Int>(1)
+ override fun toString(): String = "ArrayChannel(1)"
+ },
+ ARRAY_10 {
+ override fun create(): Channel<Int> = ArrayChannel<Int>(8)
+ override fun toString(): String = "ArrayChannel(8)"
+ }
+ ;
+
+ abstract fun create(): Channel<Int>
+}