Merged develop (js-channels work) into await-all
# Conflicts:
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
# common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
index c12501e..8743d2b 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt
@@ -18,7 +18,7 @@
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
-public expect open class CancellationException(message: String) : IllegalStateException
+public expect open class CancellationException(message: String?) : IllegalStateException
public expect class JobCancellationException(
message: String,
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
similarity index 98%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 4ce28da..697c6ef 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -18,6 +18,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
@@ -934,28 +935,28 @@
}
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
+@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
+@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
/** @suppress **This is unstable API and it is subject to change.** */
-@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
+@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")
/**
* Represents sending waiter in the queue.
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
similarity index 97%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
index 5c4fa55..ffdf694 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -16,10 +16,9 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.selects.*
-import java.util.concurrent.*
-import java.util.concurrent.locks.*
-import kotlin.concurrent.*
/**
* Broadcast channel with array buffer of a fixed [capacity].
@@ -64,7 +63,7 @@
So read/writes to buffer need not be volatile
*/
- private val subs = CopyOnWriteArrayList<Subscriber<E>>()
+ private val subs = subscriberList<Subscriber<E>>()
override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity
@@ -132,7 +131,6 @@
// updates head if needed and optionally adds / removes subscriber under the same lock
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
- assert(addSub == null || removeSub == null) // only one of them can be specified
// update head in a tail rec loop
var send: Send? = null
var token: Any? = null
@@ -200,7 +198,8 @@
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
private val subLock = ReentrantLock()
- @Volatile @JvmField
+ @Volatile
+ @JvmField
var subHead: Long = 0 // guarded by subLock
override val isBufferAlwaysEmpty: Boolean get() = false
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
similarity index 98%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index fd8c79d..b118a89 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -16,9 +16,9 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.selects.*
-import java.util.concurrent.locks.*
-import kotlin.concurrent.*
/**
* Channel with array buffer of a fixed [capacity].
@@ -249,4 +249,4 @@
override val bufferDebugString: String
get() = "(buffer:capacity=${buffer.size},size=$size)"
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
similarity index 96%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
index e071aa2..0a9c589 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt
@@ -16,9 +16,10 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
-import java.io.Closeable
+import kotlinx.coroutines.experimental.internal.Closeable
/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
similarity index 100%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
similarity index 82%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
index a527be2..a23c2dc 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt
@@ -27,5 +27,12 @@
val channel: Channel<E>
get() = this
+ // Workaround for KT-23094
+ override suspend fun receive(): E = _channel.receive()
+
+ override suspend fun send(element: E) = _channel.send(element)
+
+ override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()
+
override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
similarity index 98%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
index 87abbf5..ecb4262 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt
@@ -21,24 +21,6 @@
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
-// -------- Operations on SendChannel --------
-
-/**
- * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
- * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
- *
- * This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
- * so this function should not be used from coroutine.
- */
-public fun <E> SendChannel<E>.sendBlocking(element: E) {
- // fast path
- if (offer(element))
- return
- // slow path
- runBlocking {
- send(element)
- }
-}
// -------- Conversions to ReceiveChannel --------
@@ -120,7 +102,7 @@
if (exception == null) {
exception = e
} else {
- exception.addSuppressed(e)
+ exception.addSuppressedThrowable(e)
}
}
exception?.let { throw it }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
similarity index 95%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
index 95ae090..a8ec1ff 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt
@@ -16,12 +16,11 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.atomicfu.atomic
-import kotlinx.atomicfu.loop
-import kotlinx.coroutines.experimental.internal.Symbol
-import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
-import kotlinx.coroutines.experimental.selects.SelectClause2
-import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.internalAnnotations.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlinx.coroutines.experimental.selects.*
/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -162,8 +161,8 @@
check(i >= 0)
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
- System.arraycopy(list, 0, update, 0, i)
- System.arraycopy(list, i + 1, update, i, n - i - 1)
+ arraycopy(list, 0, update, 0, i)
+ arraycopy(list, i + 1, update, i, n - i - 1)
return update as Array<Subscriber<E>>
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
similarity index 99%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
index 7589179..ae98756 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt
@@ -80,4 +80,3 @@
}
}
}
-
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
similarity index 100%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
similarity index 100%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
similarity index 100%
rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
new file mode 100644
index 0000000..eac8bbf
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt
@@ -0,0 +1,6 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Cross-platform array copy. Overlaps of source and destination are not supported
+ */
+internal expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
index ce19e05..77b4175 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
@@ -66,8 +66,10 @@
final override fun perform(affected: Any?): Any? {
// make decision on status
var decision = this._consensus.value
- if (decision === NO_DECISION)
+ if (decision === NO_DECISION) {
decision = decide(prepare(affected as T))
+ }
+
complete(affected as T, decision)
return decision
}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
new file mode 100644
index 0000000..3004a10
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt
@@ -0,0 +1,11 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Closeable entity.
+ * @suppress **Deprecated**
+ */
+@Deprecated("No replacement, see specific use")
+public expect interface Closeable {
+ @Deprecated("No replacement, see specific code")
+ fun close()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
new file mode 100644
index 0000000..3141212
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt
@@ -0,0 +1,18 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
+ * On JVM it's CopyOnWriteList and on JS it's MutableList.
+ *
+ * Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
+ */
+internal typealias SubscribersList<E> = MutableList<E>
+
+internal expect fun <E> subscriberList(): SubscribersList<E>
+
+internal expect class ReentrantLock() {
+ fun tryLock(): Boolean
+ fun unlock(): Unit
+}
+
+internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
index 9cc3e14..aab23d1 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt
@@ -16,12 +16,12 @@
package kotlinx.coroutines.experimental.internal
-import kotlin.jvm.*
-
/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
+ public val next: Any
public val nextNode: LockFreeLinkedListNode
+ public val prev: Any
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
@@ -57,11 +57,24 @@
val queue: LockFreeLinkedListNode
val node: T
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
+}
+
+/** @suppress **This is unstable API and it is subject to change.** */
+public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
+ val queue: LockFreeLinkedListNode
+ public val result: T
+ protected open fun validatePrepared(node: T): Boolean
+ protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
+ final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
/** @suppress **This is unstable API and it is subject to change.** */
public expect abstract class AbstractAtomicDesc : AtomicDesc {
- protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
+ protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
+ protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
+ protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
+ protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
index 530d9d7..6f391e9 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt
@@ -21,6 +21,6 @@
*
* @suppress **This is unstable API and it is subject to change.**
*/
-public class Symbol(val symbol: String) {
+internal class Symbol(val symbol: String) {
override fun toString(): String = symbol
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
similarity index 70%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
index f48c787..b70a7a0 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -17,29 +17,26 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
-import org.hamcrest.core.*
-import org.junit.*
-import org.junit.Assert.*
-import java.io.IOException
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class ArrayBroadcastChannelTest : TestBase() {
@Test
- fun testBasic() = runBlocking<Unit> {
+ fun testBasic() = runTest {
expect(1)
val broadcast = ArrayBroadcastChannel<Int>(1)
- assertThat(broadcast.isClosedForSend, IsEqual(false))
+ assertFalse(broadcast.isClosedForSend)
val first = broadcast.openSubscription()
launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
expect(2)
- assertThat(first.receive(), IsEqual(1)) // suspends
- assertThat(first.isClosedForReceive, IsEqual(false))
+ assertEquals(1, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
expect(5)
- assertThat(first.receive(), IsEqual(2)) // suspends
- assertThat(first.isClosedForReceive, IsEqual(false))
+ assertEquals(2, first.receive()) // suspends
+ assertFalse(first.isClosedForReceive)
expect(10)
- assertThat(first.receiveOrNull(), IsNull()) // suspends
- assertThat(first.isClosedForReceive, IsEqual(true))
+ assertNull(first.receiveOrNull()) // suspends
+ assertTrue(first.isClosedForReceive)
expect(14)
}
expect(3)
@@ -47,14 +44,15 @@
expect(4)
yield() // to the first receiver
expect(6)
+
val second = broadcast.openSubscription()
launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
expect(7)
- assertThat(second.receive(), IsEqual(2)) // suspends
- assertThat(second.isClosedForReceive, IsEqual(false))
+ assertEquals(2, second.receive()) // suspends
+ assertFalse(second.isClosedForReceive)
expect(11)
- assertThat(second.receiveOrNull(), IsNull()) // suspends
- assertThat(second.isClosedForReceive, IsEqual(true))
+ assertNull(second.receiveOrNull()) // suspends
+ assertTrue(second.isClosedForReceive)
expect(15)
}
expect(8)
@@ -64,21 +62,21 @@
expect(12)
broadcast.close()
expect(13)
- assertThat(broadcast.isClosedForSend, IsEqual(true))
+ assertTrue(broadcast.isClosedForSend)
yield() // to first & second receivers
finish(16)
}
@Test
- fun testSendSuspend() = runBlocking {
+ fun testSendSuspend() = runTest {
expect(1)
val broadcast = ArrayBroadcastChannel<Int>(1)
val first = broadcast.openSubscription()
launch(coroutineContext) {
expect(4)
- assertThat(first.receive(), IsEqual(1))
+ assertEquals(1, first.receive())
expect(5)
- assertThat(first.receive(), IsEqual(2))
+ assertEquals(2, first.receive())
expect(6)
}
expect(2)
@@ -89,7 +87,7 @@
}
@Test
- fun testConcurrentSendCompletion() = runBlocking {
+ fun testConcurrentSendCompletion() = runTest {
expect(1)
val broadcast = ArrayBroadcastChannel<Int>(1)
val sub = broadcast.openSubscription()
@@ -105,17 +103,17 @@
broadcast.close()
// now must receive all 3 items
expect(6)
- assertThat(sub.isClosedForReceive, IsEqual(false))
+ assertFalse(sub.isClosedForReceive)
for (x in 1..3)
- assertThat(sub.receiveOrNull(), IsEqual(x))
+ assertEquals(x, sub.receiveOrNull())
// and receive close signal
- assertThat(sub.receiveOrNull(), IsNull())
- assertThat(sub.isClosedForReceive, IsEqual(true))
+ assertNull(sub.receiveOrNull())
+ assertTrue(sub.isClosedForReceive)
finish(7)
}
@Test
- fun testForgetUnsubscribed() = runBlocking {
+ fun testForgetUnsubscribed() = runTest {
expect(1)
val broadcast = ArrayBroadcastChannel<Int>(1)
broadcast.send(1)
@@ -125,7 +123,7 @@
val sub = broadcast.openSubscription()
launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
expect(3)
- assertThat(sub.receive(), IsEqual(4)) // suspends
+ assertEquals(4, sub.receive()) // suspends
expect(5)
}
expect(4)
@@ -135,7 +133,7 @@
}
@Test
- fun testReceiveFullAfterClose() = runBlocking<Unit> {
+ fun testReceiveFullAfterClose() = runTest {
val channel = BroadcastChannel<Int>(10)
val sub = channel.openSubscription()
// generate into buffer & close
@@ -149,7 +147,7 @@
}
@Test
- fun testCloseSubDuringIteration() = runBlocking<Unit> {
+ fun testCloseSubDuringIteration() = runTest {
val channel = BroadcastChannel<Int>(1)
// launch generator (for later) in this context
launch(coroutineContext) {
@@ -162,29 +160,29 @@
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
- sub.close()
+ sub.cancel()
}
}
check(expected == 2)
}
@Test
- fun testReceiveFromClosedSub() = runTest(
- expected = { it is ClosedReceiveChannelException }
- ) {
+ fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
val channel = BroadcastChannel<Int>(1)
val sub = channel.openSubscription()
assertFalse(sub.isClosedForReceive)
- sub.close()
+ sub.cancel()
assertTrue(sub.isClosedForReceive)
sub.receive()
}
- @Test(expected = IOException::class)
- fun testCancelWithCause() = runBlocking<Unit> {
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
val channel = BroadcastChannel<Int>(1)
val subscription = channel.openSubscription()
- subscription.cancel(IOException())
+ subscription.cancel(TestException())
subscription.receiveOrNull()
}
+
+ private class TestException : Exception()
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
similarity index 83%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
index fadf340..170e579 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt
@@ -17,14 +17,12 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
-import org.junit.*
-import org.junit.Assert.*
-import java.io.*
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class ArrayChannelTest : TestBase() {
@Test
- fun testSimple() = runBlocking {
+ fun testSimple() = runTest {
val q = ArrayChannel<Int>(1)
check(q.isEmpty && !q.isFull)
expect(1)
@@ -53,25 +51,7 @@
}
@Test
- fun testStress() = runBlocking {
- val n = 100_000
- val q = ArrayChannel<Int>(1)
- val sender = launch(coroutineContext) {
- for (i in 1..n) q.send(i)
- expect(2)
- }
- val receiver = launch(coroutineContext) {
- for (i in 1..n) check(q.receive() == i)
- expect(3)
- }
- expect(1)
- sender.join()
- receiver.join()
- finish(4)
- }
-
- @Test
- fun testClosedBufferedReceiveOrNull() = runBlocking {
+ fun testClosedBufferedReceiveOrNull() = runTest {
val q = ArrayChannel<Int>(1)
check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
@@ -96,7 +76,7 @@
}
@Test
- fun testClosedExceptions() = runBlocking {
+ fun testClosedExceptions() = runTest {
val q = ArrayChannel<Int>(1)
expect(1)
launch(coroutineContext) {
@@ -107,7 +87,8 @@
}
}
expect(2)
- q.close()
+
+ require(q.close())
expect(3)
yield()
expect(6)
@@ -118,7 +99,7 @@
}
@Test
- fun testOfferAndPool() = runBlocking {
+ fun testOfferAndPool() = runTest {
val q = ArrayChannel<Int>(1)
assertTrue(q.offer(1))
expect(1)
@@ -148,7 +129,7 @@
}
@Test
- fun testConsumeAll() = runBlocking {
+ fun testConsumeAll() = runTest {
val q = ArrayChannel<Int>(5)
for (i in 1..10) {
if (i <= 5) {
@@ -170,10 +151,12 @@
finish(12)
}
- @Test(expected = IOException::class)
- fun testCancelWithCause() = runBlocking<Unit> {
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
val channel = ArrayChannel<Int>(5)
- channel.cancel(IOException())
+ channel.cancel(TestException())
channel.receiveOrNull()
}
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
new file mode 100644
index 0000000..2bbc4a1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlin.test.*
+
+
+class BroadcastChannelFactoryTest {
+
+ @Test
+ fun testRendezvousChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(0) }
+ }
+
+ @Test
+ fun testLinkedListChannelNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(Channel.UNLIMITED) }
+ }
+
+ @Test
+ fun testConflatedBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(Channel.CONFLATED) is ConflatedBroadcastChannel }
+ }
+
+ @Test
+ fun testArrayBroadcastChannel() {
+ assertTrue { BroadcastChannel<Int>(1) is ArrayBroadcastChannel }
+ assertTrue { BroadcastChannel<Int>(10) is ArrayBroadcastChannel }
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() {
+ assertFailsWith<IllegalArgumentException> { BroadcastChannel<Int>(-2) }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
new file mode 100644
index 0000000..d4c5126
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
+
+
+class ChannelFactoryTest : TestBase() {
+
+ @Test
+ fun testRendezvousChannel() {
+ assertTrue(Channel<Int>() is RendezvousChannel)
+ assertTrue(Channel<Int>(0) is RendezvousChannel)
+ }
+
+ @Test
+ fun testLinkedListChannel() {
+ assertTrue(Channel<Int>(Channel.UNLIMITED) is LinkedListChannel)
+ }
+
+ @Test
+ fun testConflatedChannel() {
+ assertTrue(Channel<Int>(Channel.CONFLATED) is ConflatedChannel)
+ }
+
+ @Test
+ fun testArrayChannel() {
+ assertTrue(Channel<Int>(1) is ArrayChannel)
+ assertTrue(Channel<Int>(10) is ArrayChannel)
+ }
+
+ @Test
+ fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) {
+ Channel<Int>(-2)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
similarity index 77%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
index cdb4f45..0fe1fc9 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt
@@ -16,87 +16,72 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.async
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Assert.*
-import org.junit.Test
+import kotlinx.coroutines.experimental.*
+import kotlin.math.*
+import kotlin.test.*
-class ChannelsTest {
+class ChannelsTest: TestBase() {
private val testList = listOf(1, 2, 3)
@Test
- fun testBlocking() {
- val ch = Channel<Int>()
- val sum = async {
- ch.sumBy { it }
- }
- repeat(10) {
- ch.sendBlocking(it)
- }
- ch.close()
- assertEquals(45, runBlocking { sum.await() })
-
- }
-
- @Test
- fun testIterableAsReceiveChannel() = runBlocking {
+ fun testIterableAsReceiveChannel() = runTest {
assertEquals(testList, testList.asReceiveChannel().toList())
}
@Test
- fun testSequenceAsReceiveChannel() = runBlocking {
+ fun testSequenceAsReceiveChannel() = runTest {
assertEquals(testList, testList.asSequence().asReceiveChannel().toList())
}
@Test
- fun testAssociate() = runBlocking {
+ fun testAssociate() = runTest {
assertEquals(testList.associate { it * 2 to it * 3 },
testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap())
}
@Test
- fun testAssociateBy() = runBlocking {
+ fun testAssociateBy() = runTest {
assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 })
}
@Test
- fun testAssociateBy2() = runBlocking {
+ fun testAssociateBy2() = runTest {
assertEquals(testList.associateBy({ it * 2}, { it * 3 }),
testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap())
}
@Test
- fun testDistinct() = runBlocking {
+ fun testDistinct() = runTest {
assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList())
}
@Test
- fun testDistinctBy() = runBlocking {
+ fun testDistinctBy() = runTest {
assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList())
}
@Test
- fun testToCollection() = runBlocking {
+ fun testToCollection() = runTest {
val target = mutableListOf<Int>()
testList.asReceiveChannel().toCollection(target)
assertEquals(testList, target)
}
@Test
- fun testDrop() = runBlocking {
+ fun testDrop() = runTest {
for (i in 0..testList.size) {
- assertEquals("Drop $i", testList.drop(i), testList.asReceiveChannel().drop(i).toList())
+ assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i")
}
}
@Test
- fun testElementAtOrElse() = runBlocking {
+ fun testElementAtOrElse() = runTest {
assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 })
assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 })
}
@Test
- fun testFirst() = runBlocking {
+ fun testFirst() = runTest {
assertEquals(testList.first(), testList.asReceiveChannel().first())
for (i in testList) {
assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i })
@@ -109,56 +94,56 @@
}
@Test
- fun testFirstOrNull() = runBlocking {
+ fun testFirstOrNull() = runTest {
assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull())
assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 })
assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 })
}
@Test
- fun testFlatMap() = runBlocking {
+ fun testFlatMap() = runTest {
assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList())
}
@Test
- fun testFold() = runBlocking {
+ fun testFold() = runTest {
assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } },
testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList())
}
@Test
- fun testFoldIndexed() = runBlocking {
+ fun testFoldIndexed() = runTest {
assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } },
testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList())
}
@Test
- fun testGroupBy() = runBlocking {
+ fun testGroupBy() = runTest {
assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 })
}
@Test
- fun testGroupBy2() = runBlocking {
+ fun testGroupBy2() = runTest {
assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap())
}
@Test
- fun testMap() = runBlocking {
+ fun testMap() = runTest {
assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList())
}
@Test
- fun testMapToCollection() = runBlocking {
+ fun testMapToCollection() = runTest {
val c = mutableListOf<Int>()
testList.asReceiveChannel().mapTo(c) { it + 10 }
assertEquals(testList.map { it + 10 }, c)
}
@Test
- fun testMapToSendChannel() = runBlocking {
+ fun testMapToSendChannel() = runTest {
val c = produce<Int> {
testList.asReceiveChannel().mapTo(channel) { it + 10 }
}
@@ -166,34 +151,34 @@
}
@Test
- fun testEmptyList() = runBlocking {
+ fun testEmptyList() = runTest {
assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
}
@Test
- fun testToList() = runBlocking {
+ fun testToList() = runTest {
assertEquals(testList, testList.asReceiveChannel().toList())
}
@Test
- fun testEmptySet() = runBlocking {
+ fun testEmptySet() = runTest {
assertTrue(emptyList<Nothing>().asReceiveChannel().toSet().isEmpty())
}
@Test
- fun testToSet() = runBlocking {
+ fun testToSet() = runTest {
assertEquals(testList.toSet(), testList.asReceiveChannel().toSet())
}
@Test
- fun testToMutableSet() = runBlocking {
+ fun testToMutableSet() = runTest {
assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet())
}
@Test
- fun testEmptySequence() = runBlocking {
+ fun testEmptySequence() = runTest {
val channel = Channel<Nothing>()
channel.close()
@@ -201,7 +186,7 @@
}
@Test
- fun testEmptyMap() = runBlocking {
+ fun testEmptyMap() = runTest {
val channel = Channel<Pair<Nothing, Nothing>>()
channel.close()
@@ -209,50 +194,50 @@
}
@Test
- fun testToMap() = runBlocking {
+ fun testToMap() = runTest {
val values = testList.map { it to it.toString() }
assertEquals(values.toMap(), values.asReceiveChannel().toMap())
}
@Test
- fun testReduce() = runBlocking {
+ fun testReduce() = runTest {
assertEquals(testList.reduce { acc, e -> acc * e },
testList.asReceiveChannel().reduce { acc, e -> acc * e })
}
@Test
- fun testReduceIndexed() = runBlocking {
+ fun testReduceIndexed() = runTest {
assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e },
testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e })
}
@Test
- fun testTake() = runBlocking {
+ fun testTake() = runTest {
for (i in 0..testList.size) {
assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList())
}
}
@Test
- fun testPartition() = runBlocking {
+ fun testPartition() = runTest {
assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 })
}
@Test
- fun testZip() = runBlocking {
+ fun testZip() = runTest {
val other = listOf("a", "b")
assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList())
}
@Test
- fun testElementAt() = runBlocking {
+ fun testElementAt() = runTest {
testList.indices.forEach { i ->
assertEquals(testList[i], testList.asReceiveChannel().elementAt(i))
}
}
@Test
- fun testElementAtOrNull() = runBlocking {
+ fun testElementAtOrNull() = runTest {
testList.indices.forEach { i ->
assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i))
}
@@ -261,7 +246,7 @@
}
@Test
- fun testFind() = runBlocking {
+ fun testFind() = runTest {
repeat(3) { mod ->
assertEquals(testList.find { it % 2 == mod },
testList.asReceiveChannel().find { it % 2 == mod })
@@ -269,28 +254,28 @@
}
@Test
- fun testFindLast() = runBlocking {
+ fun testFindLast() = runTest {
repeat(3) { mod ->
assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod })
}
}
@Test
- fun testIndexOf() = runBlocking {
+ fun testIndexOf() = runTest {
repeat(testList.size + 1) { i ->
assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i))
}
}
@Test
- fun testLastIndexOf() = runBlocking {
+ fun testLastIndexOf() = runTest {
repeat(testList.size + 1) { i ->
assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i))
}
}
@Test
- fun testIndexOfFirst() = runBlocking {
+ fun testIndexOfFirst() = runTest {
repeat(3) { mod ->
assertEquals(testList.indexOfFirst { it % 2 == mod },
testList.asReceiveChannel().indexOfFirst { it % 2 == mod })
@@ -298,7 +283,7 @@
}
@Test
- fun testIndexOfLast() = runBlocking {
+ fun testIndexOfLast() = runTest {
repeat(3) { mod ->
assertEquals(testList.indexOfLast { it % 2 != mod },
testList.asReceiveChannel().indexOfLast { it % 2 != mod })
@@ -306,13 +291,13 @@
}
@Test
- fun testLastOrNull() = runBlocking {
+ fun testLastOrNull() = runTest {
assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull())
assertEquals(null, emptyList<Int>().asReceiveChannel().lastOrNull())
}
@Test
- fun testSingleOrNull() = runBlocking {
+ fun testSingleOrNull() = runTest {
assertEquals(1, listOf(1).asReceiveChannel().singleOrNull())
assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull())
assertEquals(null, emptyList<Int>().asReceiveChannel().singleOrNull())
@@ -327,7 +312,7 @@
}
@Test
- fun testDropWhile() = runBlocking {
+ fun testDropWhile() = runTest {
repeat(3) { mod ->
assertEquals(testList.dropWhile { it % 2 == mod },
testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList())
@@ -335,7 +320,7 @@
}
@Test
- fun testFilter() = runBlocking {
+ fun testFilter() = runTest {
repeat(3) { mod ->
assertEquals(testList.filter { it % 2 == mod },
testList.asReceiveChannel().filter { it % 2 == mod }.toList())
@@ -343,7 +328,7 @@
}
@Test
- fun testFilterToCollection() = runBlocking {
+ fun testFilterToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().filterTo(c) { it % 2 == mod }
@@ -352,7 +337,7 @@
}
@Test
- fun testFilterToSendChannel() = runBlocking {
+ fun testFilterToSendChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
testList.asReceiveChannel().filterTo(channel) { it % 2 == mod }
@@ -362,7 +347,7 @@
}
@Test
- fun testFilterNot() = runBlocking {
+ fun testFilterNot() = runTest {
repeat(3) { mod ->
assertEquals(testList.filterNot { it % 2 == mod },
testList.asReceiveChannel().filterNot { it % 2 == mod }.toList())
@@ -370,7 +355,7 @@
}
@Test
- fun testFilterNotToCollection() = runBlocking {
+ fun testFilterNotToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod }
@@ -379,7 +364,7 @@
}
@Test
- fun testFilterNotToSendChannel() = runBlocking {
+ fun testFilterNotToSendChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod }
@@ -389,7 +374,7 @@
}
@Test
- fun testFilterNotNull() = runBlocking {
+ fun testFilterNotNull() = runTest {
repeat(3) { mod ->
assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(),
testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList())
@@ -397,7 +382,7 @@
}
@Test
- fun testFilterNotNullToCollection() = runBlocking {
+ fun testFilterNotNullToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c)
@@ -406,7 +391,7 @@
}
@Test
- fun testFilterNotNullToSendChannel() = runBlocking {
+ fun testFilterNotNullToSendChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel)
@@ -416,7 +401,7 @@
}
@Test
- fun testFilterIndexed() = runBlocking {
+ fun testFilterIndexed() = runTest {
repeat(3) { mod ->
assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod },
testList.asReceiveChannel().filterIndexed { index, _ -> index % 2 == mod }.toList())
@@ -424,7 +409,7 @@
}
@Test
- fun testFilterIndexedToCollection() = runBlocking {
+ fun testFilterIndexedToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().filterIndexedTo(c) { index, _ -> index % 2 == mod }
@@ -433,17 +418,17 @@
}
@Test
- fun testFilterIndexedToChannel() = runBlocking {
+ fun testFilterIndexedToChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
- testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
+ testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod }
}
assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c.toList())
}
}
@Test
- fun testTakeWhile() = runBlocking {
+ fun testTakeWhile() = runTest {
repeat(3) { mod ->
assertEquals(testList.takeWhile { it % 2 != mod },
testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList())
@@ -451,7 +436,7 @@
}
@Test
- fun testToChannel() = runBlocking {
+ fun testToChannel() = runTest {
val c = produce<Int> {
testList.asReceiveChannel().toChannel(channel)
}
@@ -459,20 +444,20 @@
}
@Test
- fun testMapIndexed() = runBlocking {
+ fun testMapIndexed() = runTest {
assertEquals(testList.mapIndexed { index, i -> index + i },
testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList())
}
@Test
- fun testMapIndexedToCollection() = runBlocking {
+ fun testMapIndexedToCollection() = runTest {
val c = mutableListOf<Int>()
testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i }
assertEquals(testList.mapIndexed { index, i -> index + i }, c)
}
@Test
- fun testMapIndexedToSendChannel() = runBlocking {
+ fun testMapIndexedToSendChannel() = runTest {
val c = produce<Int> {
testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i }
}
@@ -480,7 +465,7 @@
}
@Test
- fun testMapNotNull() = runBlocking {
+ fun testMapNotNull() = runTest {
repeat(3) { mod ->
assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } },
testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList())
@@ -488,7 +473,7 @@
}
@Test
- fun testMapNotNullToCollection() = runBlocking {
+ fun testMapNotNullToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } }
@@ -497,7 +482,7 @@
}
@Test
- fun testMapNotNullToSendChannel() = runBlocking {
+ fun testMapNotNullToSendChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } }
@@ -507,7 +492,7 @@
}
@Test
- fun testMapIndexedNotNull() = runBlocking {
+ fun testMapIndexedNotNull() = runTest {
repeat(3) { mod ->
assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } },
testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList())
@@ -515,7 +500,7 @@
}
@Test
- fun testMapIndexedNotNullToCollection() = runBlocking {
+ fun testMapIndexedNotNullToCollection() = runTest {
repeat(3) { mod ->
val c = mutableListOf<Int>()
testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } }
@@ -524,7 +509,7 @@
}
@Test
- fun testMapIndexedNotNullToSendChannel() = runBlocking {
+ fun testMapIndexedNotNullToSendChannel() = runTest {
repeat(3) { mod ->
val c = produce<Int> {
testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } }
@@ -534,50 +519,51 @@
}
@Test
- fun testWithIndex() = runBlocking {
+ fun testWithIndex() = runTest {
assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList())
}
@Test
- fun testMaxBy() = runBlocking {
- assertEquals(testList.maxBy { 10 - Math.abs(it - 2) },
- testList.asReceiveChannel().maxBy { 10 - Math.abs(it - 2) })
+ fun testMaxBy() = runTest {
+ assertEquals(testList.maxBy { 10 - abs(it - 2) },
+ testList.asReceiveChannel().maxBy { 10 - abs(it - 2) })
}
@Test
- fun testMaxWith() = runBlocking {
- val cmp = compareBy<Int> { 10 - Math.abs(it - 2) }
+ fun testMaxWith() = runTest {
+ val cmp = compareBy<Int> { 10 - abs(it - 2) }
assertEquals(testList.maxWith(cmp),
testList.asReceiveChannel().maxWith(cmp))
}
@Test
- fun testMinBy() = runBlocking {
- assertEquals(testList.minBy { Math.abs(it - 2) },
- testList.asReceiveChannel().minBy { Math.abs(it - 2) })
+ fun testMinBy() = runTest {
+ assertEquals(testList.minBy { abs(it - 2) },
+ testList.asReceiveChannel().minBy { abs(it - 2) })
}
@Test
- fun testMinWith() = runBlocking {
- val cmp = compareBy<Int> { Math.abs(it - 2) }
+ fun testMinWith() = runTest {
+ val cmp = compareBy<Int> { abs(it - 2) }
assertEquals(testList.minWith(cmp),
testList.asReceiveChannel().minWith(cmp))
}
@Test
- fun testSumBy() = runBlocking {
+ fun testSumBy() = runTest {
assertEquals(testList.sumBy { it * 3 },
testList.asReceiveChannel().sumBy { it * 3 })
}
@Test
- fun testSumByDouble() = runBlocking {
- assertEquals(testList.sumByDouble { it * 3.0 },
- testList.asReceiveChannel().sumByDouble { it * 3.0 }, 1e-10)
+ fun testSumByDouble() = runTest {
+ val expected = testList.sumByDouble { it * 3.0 }
+ val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 }
+ assertEquals(expected, actual)
}
@Test
- fun testRequireNoNulls() = runBlocking {
+ fun testRequireNoNulls() = runTest {
assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList())
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
new file mode 100644
index 0000000..2cd539b
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ConflatedBroadcastChannelTest : TestBase() {
+
+ @Test
+ fun testBasicScenario() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel<String>()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertNull(sub.poll())
+ expect(3)
+ assertEquals("one", sub.receive()) // suspends
+ expect(6)
+ assertEquals("two", sub.receive()) // suspends
+ expect(12)
+ sub.close()
+ expect(13)
+ }
+
+ expect(4)
+ broadcast.send("one") // does not suspend
+ assertEquals("one", broadcast.value)
+ assertEquals("one", broadcast.valueOrNull)
+ expect(5)
+ yield() // to receiver
+ expect(7)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(8)
+ val sub = broadcast.openSubscription()
+ assertEquals("one", sub.receive()) // does not suspend
+ expect(9)
+ assertEquals("two", sub.receive()) // suspends
+ expect(14)
+ assertEquals("three", sub.receive()) // suspends
+ expect(17)
+ assertNull(sub.receiveOrNull()) // suspends until closed
+ expect(20)
+ sub.close()
+ expect(21)
+ }
+
+ expect(10)
+ broadcast.send("two") // does not suspend
+ assertEquals("two", broadcast.value)
+ assertEquals("two", broadcast.valueOrNull)
+ expect(11)
+ yield() // to both receivers
+ expect(15)
+ broadcast.send("three") // does not suspend
+ assertEquals("three", broadcast.value)
+ assertEquals("three", broadcast.valueOrNull)
+ expect(16)
+ yield() // to second receiver
+ expect(18)
+ broadcast.close()
+ assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException)
+ assertNull(broadcast.valueOrNull)
+ expect(19)
+ yield() // to second receiver
+ assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException)
+ finish(22)
+ }
+
+ @Test
+ fun testInitialValueAndReceiveClosed() = runTest {
+ expect(1)
+ val broadcast = ConflatedBroadcastChannel(1)
+ assertEquals(1, broadcast.value)
+ assertEquals(1, broadcast.valueOrNull)
+ launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val sub = broadcast.openSubscription()
+ assertEquals(1, sub.receive())
+ expect(3)
+ assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends
+ expect(6)
+ }
+ expect(4)
+ broadcast.close()
+ expect(5)
+ yield() // to child
+ finish(7)
+ }
+
+ inline fun exceptionFrom(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+
+ // Workaround for KT-23921
+ fun exceptionFromNotInline(block: () -> Unit): Throwable? {
+ try {
+ block()
+ return null
+ } catch (e: Throwable) {
+ return e
+ }
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
similarity index 68%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
index b6eafbd..2b8775b 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt
@@ -17,51 +17,48 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
-import org.hamcrest.core.*
-import org.junit.*
-import org.junit.Assert.*
-import java.io.IOException
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class ConflatedChannelTest : TestBase() {
@Test
fun testBasicConflationOfferPoll() {
val q = ConflatedChannel<Int>()
- assertThat(q.poll(), IsNull())
- assertThat(q.offer(1), IsEqual(true))
- assertThat(q.offer(2), IsEqual(true))
- assertThat(q.offer(3), IsEqual(true))
- assertThat(q.poll(), IsEqual(3))
- assertThat(q.poll(), IsNull())
+ assertNull(q.poll())
+ assertTrue(q.offer(1))
+ assertTrue(q.offer(2))
+ assertTrue(q.offer(3))
+ assertEquals(3, q.poll())
+ assertNull(q.poll())
}
@Test
- fun testConflatedSend() = runBlocking<Unit> {
+ fun testConflatedSend() = runTest {
val q = ConflatedChannel<Int>()
q.send(1)
q.send(2) // shall conflated previously sent
- assertThat(q.receiveOrNull(), IsEqual(2))
+ assertEquals(2, q.receiveOrNull())
}
@Test
- fun testConflatedClose() = runBlocking<Unit> {
+ fun testConflatedClose() = runTest {
val q = ConflatedChannel<Int>()
q.send(1)
q.close() // shall conflate sent item and become closed
- assertThat(q.receiveOrNull(), IsNull())
+ assertNull(q.receiveOrNull())
}
@Test
- fun testConflationSendReceive() = runBlocking<Unit> {
+ fun testConflationSendReceive() = runTest {
val q = ConflatedChannel<Int>()
expect(1)
launch(coroutineContext) { // receiver coroutine
expect(4)
- assertThat(q.receive(), IsEqual(2))
+ assertEquals(2, q.receive())
expect(5)
- assertThat(q.receive(), IsEqual(3)) // this receive suspends
+ assertEquals(3, q.receive()) // this receive suspends
expect(8)
- assertThat(q.receive(), IsEqual(6)) // last conflated value
+ assertEquals(6, q.receive()) // last conflated value
expect(9)
}
expect(2)
@@ -80,7 +77,7 @@
}
@Test
- fun testConsumeAll() = runBlocking {
+ fun testConsumeAll() = runTest {
val q = ConflatedChannel<Int>()
expect(1)
for (i in 1..10) {
@@ -93,10 +90,12 @@
finish(2)
}
- @Test(expected = IOException::class)
- fun testCancelWithCause() = runBlocking<Unit> {
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
val channel = ConflatedChannel<Int>()
- channel.cancel(IOException())
+ channel.cancel(TestException())
channel.receiveOrNull()
}
+
+ private class TestException : Exception()
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
similarity index 65%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
index 0b5f169..2bf376f 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt
@@ -16,31 +16,26 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.TestBase
-import kotlinx.coroutines.experimental.runBlocking
-import org.hamcrest.MatcherAssert.assertThat
-import org.hamcrest.core.IsEqual
-import org.hamcrest.core.IsNull
-import org.junit.Test
-import java.io.IOException
+import kotlinx.coroutines.experimental.*
+import kotlin.test.*
class LinkedListChannelTest : TestBase() {
@Test
- fun testBasic() = runBlocking {
+ fun testBasic() = runTest {
val c = LinkedListChannel<Int>()
c.send(1)
check(c.offer(2))
c.send(3)
check(c.close())
check(!c.close())
- assertThat(c.receive(), IsEqual(1))
- assertThat(c.poll(), IsEqual(2))
- assertThat(c.receiveOrNull(), IsEqual(3))
- assertThat(c.receiveOrNull(), IsNull())
+ assertEquals(1, c.receive())
+ assertEquals(2, c.poll())
+ assertEquals(3, c.receiveOrNull())
+ assertNull(c.receiveOrNull())
}
@Test
- fun testConsumeAll() = runBlocking {
+ fun testConsumeAll() = runTest {
val q = LinkedListChannel<Int>()
for (i in 1..10) {
q.send(i) // buffers
@@ -51,10 +46,12 @@
check(q.receiveOrNull() == null)
}
- @Test(expected = IOException::class)
- fun testCancelWithCause() = runBlocking<Unit> {
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
val channel = LinkedListChannel<Int>()
- channel.cancel(IOException())
+ channel.cancel(TestException())
channel.receiveOrNull()
}
+
+ private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
new file mode 100644
index 0000000..6646aa6
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
@@ -0,0 +1,55 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class ProduceConsumeTest : TestBase() {
+
+ @Test
+ fun testRendezvous() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testSmallBuffer() = runTest {
+ testProducer(1)
+ }
+
+ @Test
+ fun testMediumBuffer() = runTest {
+ testProducer(10)
+ }
+
+ @Test
+ fun testLargeMediumBuffer() = runTest {
+ testProducer(1000)
+ }
+
+ @Test
+ fun testUnlimited() = runTest {
+ testProducer(Channel.UNLIMITED)
+ }
+
+ private suspend fun testProducer(producerCapacity: Int) {
+ testProducer(1, producerCapacity)
+ testProducer(10, producerCapacity)
+ testProducer(100, producerCapacity)
+ }
+
+ private suspend fun testProducer(messages: Int, producerCapacity: Int) {
+ var sentAll = false
+ val producer = produce(coroutineContext, capacity = producerCapacity) {
+ for (i in 1..messages) {
+ send(i)
+ }
+ sentAll = true
+ }
+ var consumed = 0
+ for (x in producer) {
+ consumed++
+ }
+ assertTrue(sentAll)
+ assertEquals(messages, consumed)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
similarity index 92%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
index 969284e..41ba678 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt
@@ -17,13 +17,10 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
-import org.junit.*
-import java.io.IOException
import kotlin.coroutines.experimental.*
-import kotlin.test.assertNull
+import kotlin.test.*
class ProduceTest : TestBase() {
-
@Test
fun testBasic() = runTest {
val c = produce(coroutineContext) {
@@ -77,7 +74,7 @@
} catch (e: Exception) {
finish(6)
check(e is JobCancellationException && e.job == coroutineContext[Job])
- check(e.cause is IOException)
+ check(e.cause is TestException)
throw e
}
expectUnreached()
@@ -86,13 +83,15 @@
expect(1)
check(c.receive() == 1)
expect(4)
- c.cancel(IOException())
+ c.cancel(TestException())
try {
assertNull(c.receiveOrNull())
expectUnreached()
- } catch (e: IOException) {
+ } catch (e: TestException) {
expect(5)
}
}
+
+ private class TestException : Exception()
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
similarity index 83%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
index eaf1993..2b1b987 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
@@ -17,15 +17,12 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
-import org.hamcrest.core.*
-import org.junit.*
-import org.junit.Assert.*
-import java.io.IOException
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class RendezvousChannelTest : TestBase() {
@Test
- fun testSimple() = runBlocking {
+ fun testSimple() = runTest {
val q = RendezvousChannel<Int>()
check(q.isEmpty && q.isFull)
expect(1)
@@ -52,25 +49,7 @@
}
@Test
- fun testStress() = runBlocking {
- val n = 100_000
- val q = RendezvousChannel<Int>()
- val sender = launch(coroutineContext) {
- for (i in 1..n) q.send(i)
- expect(2)
- }
- val receiver = launch(coroutineContext) {
- for (i in 1..n) check(q.receive() == i)
- expect(3)
- }
- expect(1)
- sender.join()
- receiver.join()
- finish(4)
- }
-
- @Test
- fun testClosedReceiveOrNull() = runBlocking {
+ fun testClosedReceiveOrNull() = runTest {
val q = RendezvousChannel<Int>()
check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
@@ -92,7 +71,7 @@
}
@Test
- fun testClosedExceptions() = runBlocking {
+ fun testClosedExceptions() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -114,7 +93,7 @@
}
@Test
- fun testOfferAndPool() = runBlocking {
+ fun testOfferAndPool() = runTest {
val q = RendezvousChannel<Int>()
assertFalse(q.offer(1))
expect(1)
@@ -142,7 +121,7 @@
}
@Test
- fun testIteratorClosed() = runBlocking {
+ fun testIteratorClosed() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -158,7 +137,7 @@
}
@Test
- fun testIteratorOne() = runBlocking {
+ fun testIteratorOne() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -177,7 +156,7 @@
}
@Test
- fun testIteratorOneWithYield() = runBlocking {
+ fun testIteratorOneWithYield() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -198,7 +177,7 @@
}
@Test
- fun testIteratorTwo() = runBlocking {
+ fun testIteratorTwo() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -222,7 +201,7 @@
}
@Test
- fun testIteratorTwoWithYield() = runBlocking {
+ fun testIteratorTwoWithYield() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -248,7 +227,7 @@
}
@Test
- fun testSuspendSendOnClosedChannel() = runBlocking<Unit> {
+ fun testSuspendSendOnClosedChannel() = runTest {
val q = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -267,9 +246,9 @@
expect(7)
yield() // try to resume sender (it will not resume despite the close!)
expect(8)
- assertThat(q.receiveOrNull(), IsEqual(42))
+ assertEquals(42, q.receiveOrNull())
expect(9)
- assertThat(q.receiveOrNull(), IsNull())
+ assertNull(q.receiveOrNull())
expect(10)
yield() // to sender, it was resumed!
finish(12)
@@ -282,7 +261,7 @@
}
@Test
- fun testProduceBadClass() = runBlocking {
+ fun testProduceBadClass() = runTest {
val bad = BadClass()
val c = produce(coroutineContext) {
expect(1)
@@ -293,7 +272,7 @@
}
@Test
- fun testConsumeAll() = runBlocking {
+ fun testConsumeAll() = runTest {
val q = RendezvousChannel<Int>()
for (i in 1..10) {
launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
@@ -310,10 +289,12 @@
finish(12)
}
- @Test(expected = IOException::class)
- fun testCancelWithCause() = runBlocking<Unit> {
+ @Test
+ fun testCancelWithCause() = runTest({ it is TestException }) {
val channel = RendezvousChannel<Int>()
- channel.cancel(IOException())
+ channel.cancel(TestException())
channel.receiveOrNull()
}
-}
\ No newline at end of file
+
+ private class TestException : Exception()
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
new file mode 100644
index 0000000..c85f541
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt
@@ -0,0 +1,46 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SendReceiveStressTest : TestBase() {
+
+ // Emulate parametrized by hand :(
+
+ @Test
+ fun testArrayChannel() = runTest {
+ testStress(ArrayChannel(2))
+ }
+
+ @Test
+ fun testLinkedListChannel() = runTest {
+ testStress(LinkedListChannel())
+ }
+
+ @Test
+ fun testRendezvousChannel() = runTest {
+ testStress(RendezvousChannel())
+ }
+
+ private suspend fun testStress(channel: Channel<Int>) {
+ val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :(
+ val sender = launch(coroutineContext) {
+ for (i in 1..n) {
+ channel.send(i)
+ }
+ expect(2)
+ }
+ val receiver = launch(coroutineContext) {
+ for (i in 1..n) {
+ val next = channel.receive()
+ check(next == i)
+ }
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
new file mode 100644
index 0000000..69e939f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
@@ -0,0 +1,35 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+class SimpleSendReceiveTest : TestBase() {
+
+ @Test
+ fun testSimpleSendReceive() = runTest {
+ // Parametrized common test :(
+ TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) }
+ }
+
+ private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) {
+ val channel = kind.create()
+
+ launch(coroutineContext) {
+ repeat(iterations) { channel.send(it) }
+ channel.close()
+ }
+ var expected = 0
+ for (x in channel) {
+ if (!kind.isConflated) {
+ assertEquals(expected++, x)
+ } else {
+ assertTrue(x >= expected)
+ expected = x + 1
+ }
+ }
+ if (!kind.isConflated) {
+ assertEquals(iterations, expected)
+ }
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
similarity index 94%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
index e025b19..60dbb97 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt
@@ -18,15 +18,15 @@
enum class TestBroadcastChannelKind {
ARRAY_1 {
- override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel<T>(1)
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(1)
override fun toString(): String = "ArrayBroadcastChannel(1)"
},
ARRAY_10 {
- override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel<T>(10)
+ override fun <T> create(): BroadcastChannel<T> = ArrayBroadcastChannel(10)
override fun toString(): String = "ArrayBroadcastChannel(10)"
},
CONFLATED {
- override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel<T>()
+ override fun <T> create(): BroadcastChannel<T> = ConflatedBroadcastChannel()
override fun toString(): String = "ConflatedBroadcastChannel"
override val isConflated: Boolean get() = true
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
similarity index 82%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
index 36fa8c3..c3ac904 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt
@@ -20,23 +20,23 @@
enum class TestChannelKind {
RENDEZVOUS {
- override fun create(): Channel<Int> = RendezvousChannel<Int>()
+ override fun create(): Channel<Int> = RendezvousChannel()
override fun toString(): String = "RendezvousChannel"
},
ARRAY_1 {
- override fun create(): Channel<Int> = ArrayChannel<Int>(1)
+ override fun create(): Channel<Int> = ArrayChannel(1)
override fun toString(): String = "ArrayChannel(1)"
},
ARRAY_10 {
- override fun create(): Channel<Int> = ArrayChannel<Int>(8)
+ override fun create(): Channel<Int> = ArrayChannel(8)
override fun toString(): String = "ArrayChannel(8)"
},
LINKED_LIST {
- override fun create(): Channel<Int> = LinkedListChannel<Int>()
+ override fun create(): Channel<Int> = LinkedListChannel()
override fun toString(): String = "LinkedListChannel"
},
CONFLATED {
- override fun create(): Channel<Int> = ConflatedChannel<Int>()
+ override fun create(): Channel<Int> = ConflatedChannel()
override fun toString(): String = "ConflatedChannel"
override val isConflated: Boolean get() = true
},
@@ -66,8 +66,12 @@
override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
override val isEmpty: Boolean get() = sub.isEmpty
- suspend override fun receive(): E = sub.receive()
- suspend override fun receiveOrNull(): E? = sub.receiveOrNull()
+
+ // Workaround for KT-23094
+ override suspend fun send(element: E) = broadcast.send(element)
+
+ override suspend fun receive(): E = sub.receive()
+ override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause)
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
similarity index 85%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
index 54aaae0..6e6c508 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
@@ -19,13 +19,13 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.intrinsics.*
-import org.junit.*
-import org.junit.Assert.*
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class SelectArrayChannelTest : TestBase() {
+
@Test
- fun testSelectSendSuccess() = runBlocking<Unit> {
+ fun testSelectSendSuccess() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
launch(coroutineContext) {
@@ -44,7 +44,7 @@
}
@Test
- fun testSelectSendSuccessWithDefault() = runBlocking<Unit> {
+ fun testSelectSendSuccessWithDefault() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
launch(coroutineContext) {
@@ -66,7 +66,7 @@
}
@Test
- fun testSelectSendReceiveBuf() = runBlocking<Unit> {
+ fun testSelectSendReceiveBuf() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
select<Unit> {
@@ -85,7 +85,7 @@
}
@Test
- fun testSelectSendWait() = runBlocking<Unit> {
+ fun testSelectSendWait() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
launch(coroutineContext) {
@@ -107,7 +107,7 @@
}
@Test
- fun testSelectReceiveSuccess() = runBlocking<Unit> {
+ fun testSelectReceiveSuccess() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
channel.send("OK")
@@ -122,7 +122,7 @@
}
@Test
- fun testSelectReceiveSuccessWithDefault() = runBlocking<Unit> {
+ fun testSelectReceiveSuccessWithDefault() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
channel.send("OK")
@@ -140,7 +140,7 @@
}
@Test
- fun testSelectReceiveWaitWithDefault() = runBlocking<Unit> {
+ fun testSelectReceiveWaitWithDefault() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
select<Unit> {
@@ -170,7 +170,7 @@
}
@Test
- fun testSelectReceiveWait() = runBlocking<Unit> {
+ fun testSelectReceiveWait() = runTest {
expect(1)
val channel = ArrayChannel<String>(1)
launch(coroutineContext) {
@@ -188,8 +188,8 @@
finish(6)
}
- @Test(expected = ClosedReceiveChannelException::class)
- fun testSelectReceiveClosed() = runBlocking<Unit> {
+ @Test
+ fun testSelectReceiveClosed() = runTest({it is ClosedReceiveChannelException}) {
expect(1)
val channel = ArrayChannel<String>(1)
channel.close()
@@ -202,8 +202,8 @@
expectUnreached()
}
- @Test(expected = ClosedReceiveChannelException::class)
- fun testSelectReceiveWaitClosed() = runBlocking<Unit> {
+ @Test
+ fun testSelectReceiveWaitClosed() = runTest({it is ClosedReceiveChannelException}) {
expect(1)
val channel = ArrayChannel<String>(1)
launch(coroutineContext) {
@@ -221,9 +221,9 @@
}
@Test
- fun testSelectSendResourceCleanup() = runBlocking<Unit> {
+ fun testSelectSendResourceCleanup() = runTest {
val channel = ArrayChannel<Int>(1)
- val n = 10_000_000 * stressTestMultiplier
+ val n = 1000
expect(1)
channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed
repeat(n) { i ->
@@ -236,9 +236,9 @@
}
@Test
- fun testSelectReceiveResourceCleanup() = runBlocking<Unit> {
+ fun testSelectReceiveResourceCleanup() = runTest {
val channel = ArrayChannel<Int>(1)
- val n = 10_000_000 * stressTestMultiplier
+ val n = 1000
expect(1)
repeat(n) { i ->
select {
@@ -250,7 +250,7 @@
}
@Test
- fun testSelectReceiveDispatchNonSuspending() = runBlocking<Unit> {
+ fun testSelectReceiveDispatchNonSuspending() = runTest {
val channel = ArrayChannel<Int>(1)
expect(1)
channel.send(42)
@@ -272,7 +272,7 @@
}
@Test
- fun testSelectReceiveDispatchNonSuspending2() = runBlocking<Unit> {
+ fun testSelectReceiveDispatchNonSuspending2() = runTest {
val channel = ArrayChannel<Int>(1)
expect(1)
channel.send(42)
@@ -303,4 +303,4 @@
if (!trySelect(null)) return
block.startCoroutineUndispatched(this)
}
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt
similarity index 90%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt
index eee7283..20322cd 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt
@@ -17,15 +17,14 @@
package kotlinx.coroutines.experimental.selects
import kotlinx.coroutines.experimental.*
-import org.junit.*
-import org.junit.Assert.*
import kotlin.coroutines.experimental.*
+import kotlin.test.*
-class SelectBiasTest {
+class SelectBiasTest : TestBase() {
val n = 10_000
@Test
- fun testBiased() = runBlocking<Unit> {
+ fun testBiased() = runTest {
val d0 = async(coroutineContext) { 0 }
val d1 = async(coroutineContext) { 1 }
val counter = IntArray(2)
@@ -41,7 +40,7 @@
}
@Test
- fun testUnbiased() = runBlocking<Unit> {
+ fun testUnbiased() = runTest {
val d0 = async(coroutineContext) { 0 }
val d1 = async(coroutineContext) { 1 }
val counter = IntArray(2)
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
similarity index 87%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
index d71be6a..04cde2a 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
+++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
@@ -13,19 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
package kotlinx.coroutines.experimental.selects
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.intrinsics.*
-import org.junit.*
-import org.junit.Assert.*
import kotlin.coroutines.experimental.*
+import kotlin.test.*
class SelectRendezvousChannelTest : TestBase() {
+
@Test
- fun testSelectSendSuccess() = runBlocking<Unit> {
+ fun testSelectSendSuccess() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -44,7 +45,7 @@
}
@Test
- fun testSelectSendSuccessWithDefault() = runBlocking<Unit> {
+ fun testSelectSendSuccessWithDefault() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -66,7 +67,7 @@
}
@Test
- fun testSelectSendWaitWithDefault() = runBlocking<Unit> {
+ fun testSelectSendWaitWithDefault() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
select<Unit> {
@@ -92,7 +93,7 @@
}
@Test
- fun testSelectSendWait() = runBlocking<Unit> {
+ fun testSelectSendWait() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -110,7 +111,7 @@
}
@Test
- fun testSelectReceiveSuccess() = runBlocking<Unit> {
+ fun testSelectReceiveSuccess() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -130,7 +131,7 @@
}
@Test
- fun testSelectReceiveSuccessWithDefault() = runBlocking<Unit> {
+ fun testSelectReceiveSuccessWithDefault() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -153,7 +154,7 @@
}
@Test
- fun testSelectReceiveWaitWithDefault() = runBlocking<Unit> {
+ fun testSelectReceiveWaitWithDefault() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
select<Unit> {
@@ -179,7 +180,7 @@
}
@Test
- fun testSelectReceiveWait() = runBlocking<Unit> {
+ fun testSelectReceiveWait() = runTest {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -197,8 +198,8 @@
finish(6)
}
- @Test(expected = ClosedReceiveChannelException::class)
- fun testSelectReceiveClosed() = runBlocking<Unit> {
+ @Test
+ fun testSelectReceiveClosed() = runTest(expected = { it is ClosedReceiveChannelException }) {
expect(1)
val channel = RendezvousChannel<String>()
channel.close()
@@ -211,8 +212,8 @@
expectUnreached()
}
- @Test(expected = ClosedReceiveChannelException::class)
- fun testSelectReceiveWaitClosed() = runBlocking<Unit> {
+ @Test
+ fun testSelectReceiveWaitClosed() = runTest(expected = {it is ClosedReceiveChannelException}) {
expect(1)
val channel = RendezvousChannel<String>()
launch(coroutineContext) {
@@ -230,9 +231,9 @@
}
@Test
- fun testSelectSendResourceCleanup() = runBlocking<Unit> {
+ fun testSelectSendResourceCleanup() = runTest {
val channel = RendezvousChannel<Int>()
- val n = 10_000_000 * stressTestMultiplier
+ val n = 1_000
expect(1)
repeat(n) { i ->
select {
@@ -244,9 +245,9 @@
}
@Test
- fun testSelectReceiveResourceCleanup() = runBlocking<Unit> {
+ fun testSelectReceiveResourceCleanup() = runTest {
val channel = RendezvousChannel<Int>()
- val n = 10_000_000 * stressTestMultiplier
+ val n = 1_000
expect(1)
repeat(n) { i ->
select {
@@ -258,7 +259,7 @@
}
@Test
- fun testSelectAtomicFailure() = runBlocking<Unit> {
+ fun testSelectAtomicFailure() = runTest {
val c1 = RendezvousChannel<Int>()
val c2 = RendezvousChannel<Int>()
expect(1)
@@ -289,7 +290,7 @@
}
@Test
- fun testSelectWaitDispatch() = runBlocking<Unit> {
+ fun testSelectWaitDispatch() = runTest {
val c = RendezvousChannel<Int>()
expect(1)
launch(coroutineContext) {
@@ -323,4 +324,4 @@
if (!trySelect(null)) return
block.startCoroutineUndispatched(this)
}
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt
new file mode 100644
index 0000000..7596e62
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+
+// -------- Operations on SendChannel --------
+
+/**
+ * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
+ * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
+ *
+ * This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
+ * so this function should not be used from coroutine.
+ */
+public fun <E> SendChannel<E>.sendBlocking(element: E) {
+ // fast path
+ if (offer(element))
+ return
+ // slow path
+ runBlocking {
+ send(element)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt
new file mode 100644
index 0000000..5d3b74d
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt
@@ -0,0 +1,5 @@
+package kotlinx.coroutines.experimental.internal
+
+internal actual fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int){
+ System.arraycopy(source, srcPos, destination, destinationStart, length)
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt
new file mode 100644
index 0000000..86a7bbb
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt
@@ -0,0 +1,8 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Closeable entity.
+ * @suppress **Deprecated**
+ */
+@Deprecated("No replacement, see specific use")
+public actual typealias Closeable = java.io.Closeable
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt
new file mode 100644
index 0000000..0422d2b
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt
@@ -0,0 +1,11 @@
+package kotlinx.coroutines.experimental.internal
+
+import java.util.concurrent.*
+import kotlin.concurrent.withLock as withLockJvm
+
+internal actual fun <E> subscriberList(): MutableList<E> = CopyOnWriteArrayList<E>()
+
+@Suppress("ACTUAL_WITHOUT_EXPECT")
+internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
+
+internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 057ce9d..b99b3fc 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -42,7 +42,7 @@
private val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED")
/** @suppress **This is unstable API and it is subject to change.** */
-public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
+public actual typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
/** @suppress **This is unstable API and it is subject to change.** */
public actual typealias AddLastDesc<T> = LockFreeLinkedListNode.AddLastDesc<T>
@@ -101,7 +101,7 @@
public actual val isRemoved: Boolean get() = next is Removed
// LINEARIZABLE. Returns Node | Removed
- public val next: Any get() {
+ public actual val next: Any get() {
_next.loop { next ->
if (next !is OpDescriptor) return next
next.perform(this)
@@ -111,7 +111,7 @@
public actual val nextNode: Node get() = next.unwrap()
// LINEARIZABLE. Returns Node | Removed
- public val prev: Any get() {
+ public actual val prev: Any get() {
_prev.loop { prev ->
if (prev is Removed) return prev
prev as Node // otherwise, it can be only node
@@ -311,7 +311,7 @@
// ------ multi-word atomic operations helpers ------
- public open class AddLastDesc<T : Node>(
+ public open class AddLastDesc<T : Node> constructor(
@JvmField val queue: Node,
@JvmField val node: T
) : AbstractAtomicDesc() {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt
new file mode 100644
index 0000000..dd77e1e
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.runner.*
+import org.junit.runners.*
+
+@RunWith(Parameterized::class)
+class ArrayChannelStressTest(private val capacity: Int) : TestBase() {
+
+ companion object {
+ @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = listOf(1, 10, 100, 100_000, 1_000_000).map { arrayOf<Any>(it) }
+ }
+
+ @Test
+ fun testStress() = runTest {
+ val n = 100_000 * stressTestMultiplier
+ val q = ArrayChannel<Int>(capacity)
+ val sender = launch(kotlin.coroutines.experimental.coroutineContext) {
+ for (i in 1..n) {
+ q.send(i)
+ }
+ expect(2)
+ }
+ val receiver = launch(kotlin.coroutines.experimental.coroutineContext) {
+ for (i in 1..n) {
+ val next = q.receive()
+ check(next == i)
+ }
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
deleted file mode 100644
index dd60d24..0000000
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kotlinx.coroutines.experimental.channels
-
-import org.hamcrest.MatcherAssert.assertThat
-import org.hamcrest.core.IsInstanceOf
-import org.junit.Test
-
-class BroadcastChannelFactoryTest {
- @Test(expected = IllegalArgumentException::class)
- fun testRendezvousChannelNotSupported() {
- BroadcastChannel<Int>(0)
- }
-
- @Test(expected = IllegalArgumentException::class)
- fun testLinkedListChannelNotSupported() {
- BroadcastChannel<Int>(Channel.UNLIMITED)
- }
-
- @Test
- fun testConflatedBroadcastChannel() {
- assertThat(BroadcastChannel<Int>(Channel.CONFLATED), IsInstanceOf(ConflatedBroadcastChannel::class.java))
- }
-
- @Test
- fun testArrayBroadcastChannel() {
- assertThat(BroadcastChannel<Int>(1), IsInstanceOf(ArrayBroadcastChannel::class.java))
- assertThat(BroadcastChannel<Int>(10), IsInstanceOf(ArrayBroadcastChannel::class.java))
- }
-
- @Test(expected = IllegalArgumentException::class)
- fun testInvalidCapacityNotSupported() {
- BroadcastChannel<Int>(-2)
- }
-}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
deleted file mode 100644
index 5ea2bf2..0000000
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kotlinx.coroutines.experimental.channels
-
-import org.hamcrest.MatcherAssert.assertThat
-import org.hamcrest.core.IsInstanceOf
-import org.junit.Test
-
-class ChannelFactoryTest {
- @Test
- fun testRendezvousChannel() {
- assertThat(Channel<Int>(), IsInstanceOf(RendezvousChannel::class.java))
- assertThat(Channel<Int>(0), IsInstanceOf(RendezvousChannel::class.java))
- }
-
- @Test
- fun testLinkedListChannel() {
- assertThat(Channel<Int>(Channel.UNLIMITED), IsInstanceOf(LinkedListChannel::class.java))
- }
-
- @Test
- fun testConflatedChannel() {
- assertThat(Channel<Int>(Channel.CONFLATED), IsInstanceOf(ConflatedChannel::class.java))
- }
-
- @Test
- fun testArrayChannel() {
- assertThat(Channel<Int>(1), IsInstanceOf(ArrayChannel::class.java))
- assertThat(Channel<Int>(10), IsInstanceOf(ArrayChannel::class.java))
- }
-
- @Test(expected = IllegalArgumentException::class)
- fun testInvalidCapacityNotSupported() {
- Channel<Int>(-2)
- }
-}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt
new file mode 100644
index 0000000..6a5e509
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt
@@ -0,0 +1,21 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.*
+
+class ChannelsJvmTest : TestBase() {
+
+ @Test
+ fun testBlocking() {
+ val ch = Channel<Int>()
+ val sum = async {
+ ch.sumBy { it }
+ }
+ repeat(10) {
+ ch.sendBlocking(it)
+ }
+ ch.close()
+ assertEquals(45, runBlocking { sum.await() })
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
deleted file mode 100644
index 707b780..0000000
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kotlinx.coroutines.experimental.channels
-
-import kotlinx.coroutines.experimental.*
-import org.hamcrest.core.*
-import org.junit.*
-import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
-
-class ConflatedBroadcastChannelTest : TestBase() {
- @Test
- fun testBasicScenario() = runBlocking {
- expect(1)
- val broadcast = ConflatedBroadcastChannel<String>()
- assertThat(exceptionFrom { broadcast.value }, IsInstanceOf(IllegalStateException::class.java))
- assertThat(broadcast.valueOrNull, IsNull())
- launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
- expect(2)
- val sub = broadcast.openSubscription()
- assertThat(sub.poll(), IsNull())
- expect(3)
- assertThat(sub.receive(), IsEqual("one")) // suspends
- expect(6)
- assertThat(sub.receive(), IsEqual("two")) // suspends
- expect(12)
- sub.close()
- expect(13)
- }
- expect(4)
- broadcast.send("one") // does not suspend
- assertThat(broadcast.value, IsEqual("one"))
- assertThat(broadcast.valueOrNull, IsEqual("one"))
- expect(5)
- yield() // to receiver
- expect(7)
- launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
- expect(8)
- val sub = broadcast.openSubscription()
- assertThat(sub.receive(), IsEqual("one")) // does not suspend
- expect(9)
- assertThat(sub.receive(), IsEqual("two")) // suspends
- expect(14)
- assertThat(sub.receive(), IsEqual("three")) // suspends
- expect(17)
- assertThat(sub.receiveOrNull(), IsNull()) // suspends until closed
- expect(20)
- sub.close()
- expect(21)
- }
- expect(10)
- broadcast.send("two") // does not suspend
- assertThat(broadcast.value, IsEqual("two"))
- assertThat(broadcast.valueOrNull, IsEqual("two"))
- expect(11)
- yield() // to both receivers
- expect(15)
- broadcast.send("three") // does not suspend
- assertThat(broadcast.value, IsEqual("three"))
- assertThat(broadcast.valueOrNull, IsEqual("three"))
- expect(16)
- yield() // to second receiver
- expect(18)
- broadcast.close()
- assertThat(exceptionFrom { broadcast.value }, IsInstanceOf(IllegalStateException::class.java))
- assertThat(broadcast.valueOrNull, IsNull())
- expect(19)
- yield() // to second receiver
- assertThat(exceptionFrom { broadcast.send("four") }, IsInstanceOf(ClosedSendChannelException::class.java))
- finish(22)
- }
-
- @Test
- fun testInitialValueAndReceiveClosed() = runBlocking {
- expect(1)
- val broadcast = ConflatedBroadcastChannel<Int>(1)
- assertThat(broadcast.value, IsEqual(1))
- assertThat(broadcast.valueOrNull, IsEqual(1))
- launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
- expect(2)
- val sub = broadcast.openSubscription()
- assertThat(sub.receive(), IsEqual(1))
- expect(3)
- assertThat(exceptionFrom { sub.receive() }, IsInstanceOf(ClosedReceiveChannelException::class.java)) // suspends
- expect(6)
- }
- expect(4)
- broadcast.close()
- expect(5)
- yield() // to child
- finish(7)
- }
-
- inline fun exceptionFrom(block: () -> Unit): Throwable? {
- try {
- block()
- return null
- } catch (e: Throwable) {
- return e
- }
- }
-}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt
index 1e4682f..90a254a 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt
@@ -35,4 +35,4 @@
actor.close()
}
}
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt
similarity index 96%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt
index 8aea78f..9fde116 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt
@@ -24,7 +24,7 @@
import kotlin.coroutines.experimental.*
@RunWith(Parameterized::class)
-class ProduceConsumeTest(
+class ProduceConsumeJvmTest(
private val capacity: Int,
private val number: Int
) : TestBase() {
@@ -43,7 +43,7 @@
fun testProducer() = runTest {
var sentAll = false
val producer = produce(coroutineContext, capacity = capacity) {
- for(i in 1..number) {
+ for (i in 1..number) {
send(i)
}
sentAll = true
@@ -72,4 +72,4 @@
actor.close()
assertEquals(number, received.await())
}
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt
new file mode 100644
index 0000000..0faa693
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt
@@ -0,0 +1,26 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import kotlin.coroutines.experimental.*
+
+class RandevouzChannelStressTest : TestBase() {
+
+ @Test
+ fun testStress() = runTest {
+ val n = 100_000 * stressTestMultiplier
+ val q = RendezvousChannel<Int>()
+ val sender = launch(coroutineContext) {
+ for (i in 1..n) q.send(i)
+ expect(2)
+ }
+ val receiver = launch(coroutineContext) {
+ for (i in 1..n) check(q.receive() == i)
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt
new file mode 100644
index 0000000..4cea191
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt
@@ -0,0 +1,45 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.runner.*
+import org.junit.runners.*
+import kotlin.coroutines.experimental.*
+import kotlin.test.*
+
+@RunWith(Parameterized::class)
+class SendReceiveJvmStressTest(private val channel: Channel<Int>) : TestBase() {
+
+ companion object {
+ @Parameterized.Parameters(name = "{0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> = listOf(
+ ArrayChannel<Int>(1),
+ ArrayChannel<Int>(10),
+ ArrayChannel<Int>(1_000_000),
+ LinkedListChannel<Int>(),
+ RendezvousChannel<Int>()
+ ).map { arrayOf<Any>(it) }
+ }
+
+ @Test
+ fun testStress() = runTest {
+ val n = 100_000 * stressTestMultiplier
+ val sender = launch(coroutineContext) {
+ for (i in 1..n) {
+ channel.send(i)
+ }
+ expect(2)
+ }
+ val receiver = launch(coroutineContext) {
+ for (i in 1..n) {
+ val next = channel.receive()
+ check(next == i)
+ }
+ expect(3)
+ }
+ expect(1)
+ sender.join()
+ receiver.join()
+ finish(4)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt
similarity index 98%
rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt
index feb06e0..73639ac 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt
@@ -25,7 +25,7 @@
import kotlin.coroutines.experimental.*
@RunWith(Parameterized::class)
-class SimpleSendReceiveTest(
+class SimpleSendReceiveJvmTest(
val kind: TestChannelKind,
val n: Int,
val concurrent: Boolean
@@ -64,4 +64,4 @@
assertThat(expected, IsEqual(n))
}
}
-}
\ No newline at end of file
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt
new file mode 100644
index 0000000..097fe85
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt
@@ -0,0 +1,72 @@
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.intrinsics.*
+import kotlin.test.*
+
+class SelectChannelStressTest: TestBase() {
+
+ @Test
+ fun testSelectSendResourceCleanupArrayChannel() = runTest {
+ val channel = ArrayChannel<Int>(1)
+ val n = 10_000_000 * stressTestMultiplier
+ expect(1)
+ channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed
+ repeat(n) { i ->
+ select {
+ channel.onSend(i) { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectReceiveResourceCleanupArrayChannel() = runTest {
+ val channel = ArrayChannel<Int>(1)
+ val n = 10_000_000 * stressTestMultiplier
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onReceive { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectSendResourceCleanupRendezvousChannel() = runTest {
+ val channel = RendezvousChannel<Int>()
+ val n = 1_000_000 * stressTestMultiplier
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onSend(i) { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectReceiveResourceRendezvousChannel() = runTest {
+ val channel = RendezvousChannel<Int>()
+ val n = 1_000_000 * stressTestMultiplier
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onReceive { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
+ this as SelectBuilderImpl // type assertion
+ if (!trySelect(null)) return
+ block.startCoroutineUndispatched(this)
+ }
+}
diff --git a/js/kotlinx-coroutines-core-js/README.md b/js/kotlinx-coroutines-core-js/README.md
index 21ed982..552d4e0 100644
--- a/js/kotlinx-coroutines-core-js/README.md
+++ b/js/kotlinx-coroutines-core-js/README.md
@@ -8,6 +8,8 @@
| ------------- | ------------- | ---------------- | ---------------
| [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
+| [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
+
Coroutine dispatchers implementing [CoroutineDispatcher]:
@@ -23,6 +25,14 @@
| [NonCancellable] | A non-cancelable job that is always active
| [CoroutineExceptionHandler] | Handler for uncaught exception
+Synchronization primitives for coroutines:
+
+| **Name** | **Suspending functions** | **Description**
+| ---------- | ----------------------------------------------------------- | ---------------
+| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | Mutual exclusion
+| [Channel][kotlinx.coroutines.experimental.channels.Channel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send], [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | Communication channel (aka queue or exchanger)
+
+
Top-level suspending functions:
| **Name** | **Description**
@@ -37,6 +47,19 @@
helper function. [NonCancellable] job object is provided to suppress cancellation with
`run(NonCancellable) {...}` block of code.
+[Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously:
+
+| **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
+| ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
+| [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted]
+| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
+| [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send] | [onSend][kotlinx.coroutines.experimental.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.experimental.channels.SendChannel.offer]
+| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
+| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
+| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
+| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none
+
+
# Package kotlinx.coroutines.experimental
General-purpose coroutine builders, contexts, and helper functions.
@@ -59,4 +82,31 @@
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
[suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html
+[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
+[Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html
+[Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html
+[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
+[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html
+<!--- INDEX kotlinx.coroutines.experimental.sync -->
+[kotlinx.coroutines.experimental.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html
+[kotlinx.coroutines.experimental.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html
+[kotlinx.coroutines.experimental.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/on-lock.html
+[kotlinx.coroutines.experimental.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/try-lock.html
+<!--- INDEX kotlinx.coroutines.experimental.channels -->
+[kotlinx.coroutines.experimental.channels.produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
+[kotlinx.coroutines.experimental.channels.ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
+[kotlinx.coroutines.experimental.channels.Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
+[kotlinx.coroutines.experimental.channels.SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
+[kotlinx.coroutines.experimental.channels.SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/index.html
+[kotlinx.coroutines.experimental.channels.SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
+[kotlinx.coroutines.experimental.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/offer.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/poll.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive-or-null.html
+[kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
+<!--- INDEX kotlinx.coroutines.experimental.selects -->
+[kotlinx.coroutines.experimental.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html
+[kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-timeout.html
<!--- END -->
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
index 0220b2d..f73467e 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt
@@ -30,7 +30,7 @@
* **It is not printed to console/log by default uncaught exception handler**.
* (see [handleCoroutineException]).
*/
-public actual open class CancellationException actual constructor(message: String) : IllegalStateException(message)
+public actual open class CancellationException actual constructor(message: String?) : IllegalStateException(message)
/**
* Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled or completed
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt
new file mode 100644
index 0000000..1cab961
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt
@@ -0,0 +1,8 @@
+package kotlinx.coroutines.experimental.internal
+
+internal actual fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int) {
+ var destinationIndex = destinationStart
+ for (sourceIndex in srcPos until srcPos + length) {
+ destination[destinationIndex++] = source[sourceIndex]
+ }
+}
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt
new file mode 100644
index 0000000..938767c
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt
@@ -0,0 +1,10 @@
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * Closeable entity.
+ * @suppress **Deprecated**
+ */
+@Deprecated("No replacement, see specific use")
+public actual interface Closeable {
+ public actual fun close()
+}
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt
new file mode 100644
index 0000000..76c1a33
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt
@@ -0,0 +1,12 @@
+package kotlinx.coroutines.experimental.internal
+
+internal actual typealias ReentrantLock = NoOpLock
+
+internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = action()
+
+internal class NoOpLock {
+ fun tryLock() = true
+ fun unlock(): Unit {}
+}
+
+internal actual fun <E> subscriberList(): MutableList<E> = ArrayList()
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
index 453bf84..30c3f3c 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
@@ -31,6 +31,9 @@
@PublishedApi internal var _prev = this
@PublishedApi internal var _removed: Boolean = false
+ public val prev: Any get() = _prev
+ public val next: Any get() = _next
+
public inline val nextNode get() = _next
public inline val prevNode get() = _prev
public inline val isRemoved get() = _removed
@@ -107,6 +110,25 @@
protected override val affectedNode: Node get() = queue._prev
protected actual override fun onPrepare(affected: Node, next: Node): Any? = null
protected override fun onComplete() = queue.addLast(node)
+ protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit
+}
+
+/** @suppress **This is unstable API and it is subject to change.** */
+public actual open class RemoveFirstDesc<T> actual constructor(
+ actual val queue: LockFreeLinkedListNode
+) : AbstractAtomicDesc() {
+
+ @Suppress("UNCHECKED_CAST")
+ public actual val result: T get() = affectedNode as T
+ protected override val affectedNode: Node = queue.next as Node
+ protected actual open fun validatePrepared(node: T): Boolean = true
+ protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+ @Suppress("UNCHECKED_CAST")
+ validatePrepared(affectedNode as T)
+ return null
+ }
+ protected override fun onComplete() { queue.removeFirstOrNull() }
+ protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit
}
/** @suppress **This is unstable API and it is subject to change.** */
@@ -114,8 +136,19 @@
protected abstract val affectedNode: Node
protected actual abstract fun onPrepare(affected: Node, next: Node): Any?
protected abstract fun onComplete()
- actual final override fun prepare(op: AtomicOp<*>): Any? = onPrepare(affectedNode, affectedNode._next)
+
+ actual final override fun prepare(op: AtomicOp<*>): Any? {
+ val affected = affectedNode
+ val next = affected._next
+ val failure = failure(affected, next)
+ if (failure != null) return failure
+ return onPrepare(affected, next)
+ }
+
actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
+ protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
+ protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
+ protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
/** @suppress **This is unstable API and it is subject to change.** */
diff --git a/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt b/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt
new file mode 100644
index 0000000..564969a
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt
@@ -0,0 +1,14 @@
+package kotlinx.coroutines.experimental.internal
+
+import kotlin.test.*
+
+class ArrayCopyTest {
+
+ @Test
+ fun testArrayCopy() {
+ val source = Array(10, { it })
+ val destination = arrayOfNulls<Int>(7)
+ arraycopy(source, 2, destination, 1, 5)
+ assertEquals(listOf(null, 2, 3, 4, 5, 6, null), destination.toList())
+ }
+}