Channel.receiveOrNull becomes extension, internal receiveOrClosed added
* The corresponding ReceiveChannel methods are deprecated.
* Introduced corresponding extensions with the same semantic and generic
Any bound.
* Introduce internal ReceiveChannel.[on]receiveOrClosed
* Using internal inline class ValueOrClosed.
* To be stabilized and made public in the future when inline classes
ABI stabilizes.
* It is related to #330 but does not resolve it yet.
* Includes todos for future public ValueOrClose design.
* Simplify AbstractChannel select implementations.
* AbstractChannel implementation is optimized to avoid code
duplication in suspension of different receive methods:
receive, receiveOrNull, receiveOrClosed.
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 0f5c403..7d200fb 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -671,7 +671,9 @@
public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -744,12 +746,14 @@
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
+ public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
@@ -785,6 +789,23 @@
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}
+public final class kotlinx/coroutines/channels/ValueOrClosed {
+ public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
+ public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
+ public fun equals (Ljava/lang/Object;)Z
+ public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
+ public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
+ public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
+ public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
+ public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
+ public fun hashCode ()I
+ public static fun hashCode-impl (Ljava/lang/Object;)I
+ public static final fun isClosed-impl (Ljava/lang/Object;)Z
+ public fun toString ()Ljava/lang/String;
+ public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
+ public final synthetic fun unbox-impl ()Ljava/lang/Object;
+}
+
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
diff --git a/build.gradle b/build.gradle
index a1813d1..a1a797f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,7 +123,6 @@
def platform = platformOf(it)
apply from: rootProject.file("gradle/compile-${platform}.gradle")
-
dependencies {
// See comment below for rationale, it will be replaced with "project" dependency
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
@@ -135,6 +134,7 @@
tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all {
kotlinOptions.freeCompilerArgs += experimentalAnnotations.collect { "-Xuse-experimental=" + it }
kotlinOptions.freeCompilerArgs += "-progressive"
+ kotlinOptions.freeCompilerArgs += "-XXLanguage:+InlineClasses"
// Binary compatibility support
kotlinOptions.freeCompilerArgs += ["-Xdump-declarations-to=${buildDir}/visibilities.json"]
}
diff --git a/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt
new file mode 100644
index 0000000..303e6d1
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class ChannelReceiveOrClosedTest : TestBase() {
+ @Test
+ fun testChannelOfThrowables() = runTest {
+ val channel = Channel<Throwable>()
+ launch {
+ channel.send(TestException1())
+ channel.close(TestException2())
+ }
+
+ val element = channel.receiveOrClosed()
+ assertTrue(element.isValue)
+ assertTrue(element.value is TestException1)
+ assertTrue(element.valueOrNull is TestException1)
+
+ val closed = channel.receiveOrClosed()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.closeCause is TestException2)
+ }
+
+ @Test
+ @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
+ fun testNullableIntChanel() = runTest {
+ val channel = Channel<Int?>()
+ launch {
+ expect(2)
+ channel.send(1)
+ expect(3)
+ channel.send(null)
+
+ expect(6)
+ channel.close()
+ }
+
+ expect(1)
+ val element = channel.receiveOrClosed()
+ assertTrue(element.isValue)
+ assertEquals(1, element.value)
+ assertEquals(1, element.valueOrNull)
+ assertEquals("Value(1)", element.toString())
+ assertTrue(ValueOrClosed.value(1) == element) // Don't box
+
+ expect(4)
+ val nullElement = channel.receiveOrClosed()
+ assertTrue(nullElement.isValue)
+ assertNull(nullElement.value)
+ assertNull(nullElement.valueOrNull)
+ assertEquals("Value(null)", nullElement.toString())
+ assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box
+
+ expect(5)
+ val closed = channel.receiveOrClosed()
+ assertTrue(closed.isClosed)
+
+ val closed2 = channel.receiveOrClosed()
+ assertTrue(closed2.isClosed)
+ assertTrue(closed2.closeCause is ClosedReceiveChannelException)
+ finish(7)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testUIntChannel() = runTest {
+ val channel = Channel<UInt>()
+ launch {
+ expect(2)
+ channel.send(1u)
+ yield()
+ expect(4)
+ channel.send((Long.MAX_VALUE - 1).toUInt())
+ expect(5)
+ }
+
+ expect(1)
+ val element = channel.receiveOrClosed()
+ assertEquals(1u, element.value)
+
+ expect(3)
+ val element2 = channel.receiveOrClosed()
+ assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value)
+ finish(6)
+ }
+
+ @Test
+ fun testCancelChannel() = runTest {
+ val channel = Channel<Boolean>()
+ launch {
+ expect(2)
+ channel.cancel()
+ }
+
+ expect(1)
+ val closed = channel.receiveOrClosed()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.closeCause is ClosedReceiveChannelException)
+ finish(3)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testReceiveResultChannel() = runTest {
+ val channel = Channel<ValueOrClosed<UInt>>()
+ launch {
+ channel.send(ValueOrClosed.value(1u))
+ channel.send(ValueOrClosed.closed(TestException1()))
+ channel.close(TestException2())
+ }
+
+ val intResult = channel.receiveOrClosed()
+ assertTrue(intResult.isValue)
+ assertEquals(1u, intResult.value.value)
+
+ val closeCauseResult = channel.receiveOrClosed()
+ assertTrue(closeCauseResult.isValue)
+ assertTrue(closeCauseResult.value.closeCause is TestException1)
+
+ val closeCause = channel.receiveOrClosed()
+ assertTrue(closeCause.isClosed)
+ assertTrue(closeCause.closeCause is TestException2)
+ assertFailsWith<TestException2> { closeCause.valueOrThrow }
+ }
+
+ @Test
+ fun testToString() = runTest {
+ val channel = Channel<String>(1)
+ channel.send("message")
+ channel.close(TestException1())
+ assertEquals("Value(message)", channel.receiveOrClosed().toString())
+ // toString implementation for exception differs on every platform
+ val str = channel.receiveOrClosed().toString()
+ assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex()))
+ }
+}
diff --git a/docs/select-expression.md b/docs/select-expression.md
index a9bf7f2..35480ab 100644
--- a/docs/select-expression.md
+++ b/docs/select-expression.md
@@ -163,7 +163,7 @@
### Selecting on close
The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
-`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a
+`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
the result of its selected clause:
@@ -189,6 +189,10 @@
</div>
+Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only
+for channels with non-nullable elements so that there is no accidental confusion between a closed channel
+and a null value.
+
Let's use it with channel `a` that produces "Hello" string four times and
channel `b` that produces "World" four times:
@@ -259,7 +263,7 @@
being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.
-The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the
+The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the
channel is already closed.
### Selecting to send
@@ -433,7 +437,7 @@
Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
-[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
+[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
<div class="sample" markdown="1" theme="idea" data-highlight-only>
@@ -556,7 +560,7 @@
<!--- INDEX kotlinx.coroutines.channels -->
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
-[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
+[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
<!--- INDEX kotlinx.coroutines.selects -->
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index 2dc751e..5fe3298 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -56,7 +56,7 @@
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
-| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
+| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none
@@ -131,8 +131,8 @@
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
-[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html
-[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
+[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
+[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
<!--- INDEX kotlinx.coroutines.selects -->
[kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html
diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md
index b84cedf..a0cc809 100644
--- a/kotlinx-coroutines-core/common/README.md
+++ b/kotlinx-coroutines-core/common/README.md
@@ -59,7 +59,7 @@
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
-| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
+| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none
@@ -143,8 +143,8 @@
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
-[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html
-[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html
+[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
+[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
<!--- INDEX kotlinx.coroutines.selects -->
[kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 6419e20..bed4979 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -14,7 +14,6 @@
/**
* Abstract send channel. It is a base class for all send channel implementations.
- *
*/
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
/** @suppress **This is unstable API and it is subject to change.** */
@@ -542,13 +541,12 @@
public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
- @Suppress("UNCHECKED_CAST")
public final override suspend fun receive(): E {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return receiveResult(result)
// slow-path does suspend
- return receiveSuspend()
+ return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
}
@Suppress("UNCHECKED_CAST")
@@ -558,8 +556,8 @@
}
@Suppress("UNCHECKED_CAST")
- private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine sc@ { cont ->
- val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
+ private suspend fun <R> receiveSuspend(onClose: Int): R = suspendAtomicCancellableCoroutine sc@ { cont ->
+ val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, onClose)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
@@ -568,11 +566,11 @@
// hm... something is not right. try to poll
val result = pollInternal()
if (result is Closed<*>) {
- cont.resumeWithException(result.receiveException)
+ receive.resumeReceiveClosed(result)
return@sc
}
if (result !== POLL_FAILED) {
- cont.resume(result as E)
+ cont.resume(receive.resumeValue(result as E))
return@sc
}
}
@@ -586,13 +584,12 @@
return result
}
- @Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrNull(): E? {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return receiveOrNullResult(result)
// slow-path does suspend
- return receiveOrNullSuspend()
+ return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
}
@Suppress("UNCHECKED_CAST")
@@ -605,27 +602,12 @@
}
@Suppress("UNCHECKED_CAST")
- private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine sc@ { cont ->
- val receive = ReceiveElement(cont, nullOnClose = true)
- while (true) {
- if (enqueueReceive(receive)) {
- removeReceiveOnCancel(cont, receive)
- return@sc
- }
- // hm... something is not right. try to poll
- val result = pollInternal()
- if (result is Closed<*>) {
- if (result.closeCause == null)
- cont.resume(null)
- else
- cont.resumeWithException(result.closeCause)
- return@sc
- }
- if (result !== POLL_FAILED) {
- cont.resume(result as E)
- return@sc
- }
- }
+ public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
+ // fast path -- try poll non-blocking
+ val result = pollInternal()
+ if (result !== POLL_FAILED) return result.toResult()
+ // slow-path does suspend
+ return receiveSuspend(RECEIVE_RESULT)
}
@Suppress("UNCHECKED_CAST")
@@ -694,9 +676,9 @@
private inner class TryEnqueueReceiveDesc<E, R>(
select: SelectInstance<R>,
- block: suspend (E?) -> R,
- nullOnClose: Boolean
- ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
+ block: suspend (Any?) -> R,
+ receiveMode: Int
+ ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, receiveMode)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is Send) return ENQUEUE_FAILED
return null
@@ -728,13 +710,7 @@
while (true) {
if (select.isSelected) return
if (isEmpty) {
- val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
- when {
- enqueueResult === ALREADY_SELECTED -> return
- enqueueResult === ENQUEUE_FAILED -> {} // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
- }
+ if (registerEnqueueDesc(select, block, RECEIVE_THROWS_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -762,13 +738,7 @@
while (true) {
if (select.isSelected) return
if (isEmpty) {
- val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
- when {
- enqueueResult === ALREADY_SELECTED -> return
- enqueueResult === ENQUEUE_FAILED -> {} // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
- }
+ if (registerEnqueueDesc(select, block, RECEIVE_NULL_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -793,6 +763,51 @@
}
}
+ override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
+ get() = object : SelectClause1<ValueOrClosed<E>> {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
+ registerSelectReceiveOrClosed(select, block)
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isEmpty) {
+ if (registerEnqueueDesc(select, block, RECEIVE_RESULT)) return
+ } else {
+ val pollResult = pollSelectInternal(select)
+ when {
+ pollResult === ALREADY_SELECTED -> return
+ pollResult === POLL_FAILED -> {} // retry
+ pollResult is Closed<*> -> {
+ block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion)
+ }
+ else -> {
+ // selected successfully
+ block.startCoroutineUnintercepted(ValueOrClosed.value(pollResult as E), select.completion)
+ return
+ }
+ }
+ }
+ }
+ }
+
+ private fun <R, E> registerEnqueueDesc(
+ select: SelectInstance<R>, block: suspend (E) -> R,
+ receiveMode: Int
+ ): Boolean {
+ @Suppress("UNCHECKED_CAST")
+ val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, receiveMode)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
+ return when {
+ enqueueResult === ALREADY_SELECTED -> true
+ enqueueResult === ENQUEUE_FAILED -> false // retry
+ else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+ }
+ }
+
// ------ protected ------
override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
@@ -884,18 +899,25 @@
}
private class ReceiveElement<in E>(
- @JvmField val cont: CancellableContinuation<E?>,
- @JvmField val nullOnClose: Boolean
+ @JvmField val cont: CancellableContinuation<Any?>,
+ @JvmField val receiveMode: Int
) : Receive<E>() {
- override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
+ fun resumeValue(value: E): Any? = when (receiveMode) {
+ RECEIVE_RESULT -> ValueOrClosed.value(value)
+ else -> value
+ }
+
+ @Suppress("IMPLICIT_CAST_TO_ANY")
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(resumeValue(value), idempotent)
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
override fun resumeReceiveClosed(closed: Closed<*>) {
- if (closed.closeCause == null && nullOnClose)
- cont.resume(null)
- else
- cont.resumeWithException(closed.receiveException)
+ when {
+ receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
+ receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
+ else -> cont.resumeWithException(closed.receiveException)
+ }
}
- override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
+ override fun toString(): String = "ReceiveElement[$cont,receiveMode=$receiveMode]"
}
private class ReceiveHasNext<E>(
@@ -940,25 +962,26 @@
private inner class ReceiveSelect<R, in E>(
@JvmField val select: SelectInstance<R>,
- @JvmField val block: suspend (E?) -> R,
- @JvmField val nullOnClose: Boolean
+ @JvmField val block: suspend (Any?) -> R,
+ @JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
- block.startCoroutine(NULL_VALUE.unbox<E>(token), select.completion)
+ val value: E = NULL_VALUE.unbox<E>(token)
+ block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
}
override fun resumeReceiveClosed(closed: Closed<*>) {
- if (select.trySelect(null)) {
- if (closed.closeCause == null && nullOnClose) {
+ if (!select.trySelect(null)) return
+ when (receiveMode) {
+ RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
+ RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
+ RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
} else {
- // even though we are dispatching coroutine to process channel close on receive,
- // which is an atomically cancellable suspending function,
- // close is a final state, so we can use a cancellable resume mode
select.resumeSelectCancellableWithException(closed.receiveException)
}
}
@@ -973,7 +996,7 @@
onReceiveDequeued() // notify cancellation of receive
}
- override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
+ override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
}
private class IdempotentTokenValue<out E>(
@@ -982,6 +1005,11 @@
)
}
+// receiveMode values
+internal const val RECEIVE_THROWS_ON_CLOSE = 0
+internal const val RECEIVE_NULL_ON_CLOSE = 1
+internal const val RECEIVE_RESULT = 2
+
@JvmField
@SharedImmutable
internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
@@ -1076,3 +1104,10 @@
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
}
+
+@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
+private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
+ if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
+
+@Suppress("NOTHING_TO_INLINE")
+private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index 9fa3418..a2a6da2 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -15,6 +15,7 @@
import kotlinx.coroutines.internal.systemProp
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
+import kotlin.internal.*
/**
* Sender's interface to [Channel].
@@ -91,7 +92,8 @@
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
- * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
+ * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send
+ * and [ClosedReceiveChannelException] on attempts to receive.
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
*/
@@ -209,26 +211,74 @@
* This function can be used in [select] invocation with [onReceiveOrNull] clause.
* Use [poll] to try receiving from this channel without waiting.
*
- * **Note: This is an obsolete api.**
- * This function will be replaced with `receiveOrClosed: ReceiveResult<E>` and
- * extension `suspend fun <E: Any> ReceiveChannel<E>.receiveOrNull(): E?`
- * It is obsolete because it does not distinguish closed channel and null elements.
+ * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension.
*/
@ObsoleteCoroutinesApi
+ @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+ @LowPriorityInOverloadResolution
+ @Deprecated(
+ message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
+ )
public suspend fun receiveOrNull(): E?
/**
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
- * is received from the channel or selects with `null` if if the channel
+ * is received from the channel or selects with `null` if the channel
* [isClosedForReceive] without cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
- * **Note: This is an experimental api.** This function may be replaced with a better one in the future.
+ * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
*/
- @ExperimentalCoroutinesApi
+ @ObsoleteCoroutinesApi
+ @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+ @LowPriorityInOverloadResolution
+ @Deprecated(
+ message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
+ )
public val onReceiveOrNull: SelectClause1<E?>
/**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
+ * This method returns [ValueOrClosed] with a value if element was successfully retrieved from the channel
+ * or [ValueOrClosed] with close cause if channel was closed.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onReceiveOrClosed] clause.
+ * Use [poll] to try receiving from this channel without waiting.
+ *
+ * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
+ * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ */
+ @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
+ public suspend fun receiveOrClosed(): ValueOrClosed<E>
+
+ /**
+ * Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
+ * that is received from the channel or selects with [ValueOrClosed] with a close cause if the channel
+ * [isClosedForReceive].
+ *
+ * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
+ * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ */
+ @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
+ public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
+
+ /**
* Retrieves and removes the element from this channel, or returns `null` if this channel is empty
* or is [isClosedForReceive] without cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
@@ -272,6 +322,107 @@
}
/**
+ * A discriminated union of [ReceiveChannel.receiveOrClosed] result,
+ * that encapsulates either successfully received element of type [T] from the channel or a close cause.
+ *
+ * :todo: Do not make it public before resolving todos in the code of this class.
+ *
+ * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
+ * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ */
+@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
+@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
+public inline class ValueOrClosed<out T>
+internal constructor(private val holder: Any?) {
+ /**
+ * Returns `true` if this instance represents received element.
+ * In this case [isClosed] returns `false`.
+ * todo: it is commented for now, because it is not used
+ */
+ //public val isValue: Boolean get() = holder !is Closed
+
+ /**
+ * Returns `true` if this instance represents close cause.
+ * In this case [isValue] returns `false`.
+ */
+ public val isClosed: Boolean get() = holder is Closed
+
+ /**
+ * Returns received value if this instance represents received value or throws [IllegalStateException] otherwise.
+ *
+ * :todo: Decide if it is needed how it shall be named with relation to [valueOrThrow]:
+ *
+ * So we have the following methods on ValueOrClosed: `value`, `valueOrNull`, `valueOrThrow`.
+ * On the other hand, the channel has the following receive variants:
+ * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh?
+ * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull`
+ * * `receiveOrClosed`
+ * For the sake of consider dropping this version of `value` and rename [valueOrThrow] to simply `value`.
+ */
+ @Suppress("UNCHECKED_CAST")
+ public val value: T
+ get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T
+
+ /**
+ * Returns received value if this element represents received value or `null` otherwise.
+ * :todo: Decide if it shall be made into extension that is available only for non-null T.
+ * Note: it might become inconsistent with kotlin.Result
+ */
+ @Suppress("UNCHECKED_CAST")
+ public val valueOrNull: T?
+ get() = if (holder is Closed) null else holder as T
+
+ /**
+ * :todo: Decide if it is needed how it shall be named with relation to [value].
+ * Note, that valueOrThrow rethrows the cause adding no meaningful information about the callsite,
+ * so if one is sure that ValueOrClosed is always value, this very property should be used.
+ * Otherwise, it could be very hard to locate the source of the exception.
+ * todo: it is commented for now, because it is not used
+ */
+ //@Suppress("UNCHECKED_CAST")
+ //public val valueOrThrow: T
+ // get() = if (holder is Closed) throw holder.exception else holder as T
+
+ /**
+ * Returns close cause of the channel if this instance represents close cause or throws
+ * [IllegalStateException] otherwise.
+ */
+ @Suppress("UNCHECKED_CAST")
+ public val closeCause: Throwable? get() =
+ if (holder is Closed) holder.cause else error("Channel was not closed")
+
+ /**
+ * @suppress
+ */
+ public override fun toString(): String =
+ when (holder) {
+ is Closed -> holder.toString()
+ else -> "Value($holder)"
+ }
+
+ internal class Closed(@JvmField val cause: Throwable?) {
+ // todo: it is commented for now, because it is not used
+ //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+ override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
+ override fun hashCode(): Int = cause.hashCode()
+ override fun toString(): String = "Closed($cause)"
+ }
+
+ /**
+ * todo: consider making value/closed constructors public in the future.
+ */
+ internal companion object {
+ @Suppress("NOTHING_TO_INLINE")
+ internal inline fun <E> value(value: E): ValueOrClosed<E> =
+ ValueOrClosed(value)
+
+ @Suppress("NOTHING_TO_INLINE")
+ internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> =
+ ValueOrClosed(Closed(cause))
+ }
+}
+
+/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
*/
diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
index 352c8c1..cd37bfb 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
@@ -8,6 +8,7 @@
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.jvm.*
@@ -34,6 +35,49 @@
}
/**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or returns `null` if the channel is [closed][Channel.isClosedForReceive].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This extension is defined only for channels on non-null types, so that generic functions defined using
+ * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
+ * to find bugs.
+ */
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
+ @Suppress("DEPRECATION", "UNCHECKED_CAST")
+ return (this as ReceiveChannel<E?>).receiveOrNull()
+}
+
+/**
+ * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+ * is received from the channel or selects with `null` if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
+ * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
+ * This extension is defined only for channels on non-null types, so that generic functions defined using
+ * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
+ * to find bugs.
+ **/
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
+ @Suppress("DEPRECATION", "UNCHECKED_CAST")
+ return (this as ReceiveChannel<E?>).onReceiveOrNull
+}
+
+/**
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
*
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
index 167edba..a6ddd81 100644
--- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
@@ -35,6 +35,11 @@
}
@Test
+ fun testReceiveOrClosed() = runTest {
+ TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) }
+ }
+
+ @Test
fun testInvokeOnClose() = TestChannelKind.values().forEach { kind ->
reset()
val channel = kind.create()
@@ -124,6 +129,34 @@
assertTrue(d.getCancellationException().cause is TestException)
}
+ @Suppress("ReplaceAssertBooleanWithAssertEquality")
+ private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope {
+ reset()
+ val channel = kind.create()
+ launch {
+ expect(2)
+ channel.send(1)
+ }
+
+ expect(1)
+ val result = channel.receiveOrClosed()
+ assertEquals(1, result.value)
+ assertEquals(1, result.valueOrNull)
+ assertTrue(ValueOrClosed.value(1) == result)
+
+ expect(3)
+ launch {
+ expect(4)
+ channel.close()
+ }
+ val closed = channel.receiveOrClosed()
+ expect(5)
+ assertNull(closed.valueOrNull)
+ assertTrue(closed.isClosed)
+ assertNull(closed.closeCause)
+ assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
+ finish(6)
+ }
private suspend fun testOffer(kind: TestChannelKind) = coroutineScope {
val channel = kind.create()
diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
index 465699e..27c5816 100644
--- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
@@ -58,6 +58,7 @@
override suspend fun receive(): E = sub.receive()
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
+ override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
@@ -71,4 +72,6 @@
get() = sub.onReceive
override val onReceiveOrNull: SelectClause1<E?>
get() = sub.onReceiveOrNull
+ override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
+ get() = sub.onReceiveOrClosed
}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
index f8c3439..ece95db 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
@@ -284,6 +284,104 @@
finish(10)
}
+ @Test
+ fun testSelectReceiveOrClosedWaitClosed() = runTest {
+ expect(1)
+ val channel = Channel<String>(1)
+ launch {
+ expect(3)
+ channel.close()
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceiveOrClosed {
+ expect(5)
+ assertTrue(it.isClosed)
+ assertNull(it.closeCause)
+ }
+ }
+
+ finish(6)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest {
+ expect(1)
+ val channel = Channel<String>(1)
+ launch {
+ expect(3)
+ channel.close(TestException())
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceiveOrClosed {
+ expect(5)
+ assertTrue(it.isClosed)
+ assertTrue(it.closeCause is TestException)
+ }
+ }
+
+ finish(6)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosed() = runTest {
+ val c = Channel<Int>(1)
+ val iterations = 10
+ expect(1)
+ val job = launch {
+ repeat(iterations) {
+ select<Unit> {
+ c.onReceiveOrClosed { v ->
+ expect(4 + it * 2)
+ assertEquals(it, v.value)
+ }
+ }
+ }
+ }
+
+ expect(2)
+ repeat(iterations) {
+ expect(3 + it * 2)
+ c.send(it)
+ yield()
+ }
+
+ job.join()
+ finish(3 + iterations * 2)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosedDispatch() = runTest {
+ val c = Channel<Int>(1)
+ expect(1)
+ launch {
+ expect(3)
+ val res = select<String> {
+ c.onReceiveOrClosed { v ->
+ expect(6)
+ assertEquals(42, v.value)
+ yield() // back to main
+ expect(8)
+ "OK"
+ }
+ }
+ expect(9)
+ assertEquals("OK", res)
+ }
+ expect(2)
+ yield() // to launch
+ expect(4)
+ c.send(42) // do not suspend
+ expect(5)
+ yield() // to receive
+ expect(7)
+ yield() // again
+ finish(10)
+ }
+
// only for debugging
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
index 0c1f9f6..ed8b8d3 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
@@ -305,6 +305,104 @@
finish(10)
}
+ @Test
+ fun testSelectReceiveOrClosedWaitClosed() = runTest {
+ expect(1)
+ val channel = Channel<String>(Channel.RENDEZVOUS)
+ launch {
+ expect(3)
+ channel.close()
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceiveOrClosed {
+ expect(5)
+ assertTrue(it.isClosed)
+ assertNull(it.closeCause)
+ }
+ }
+
+ finish(6)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest {
+ expect(1)
+ val channel = Channel<String>(Channel.RENDEZVOUS)
+ launch {
+ expect(3)
+ channel.close(TestException())
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceiveOrClosed {
+ expect(5)
+ assertTrue(it.isClosed)
+ assertTrue(it.closeCause is TestException)
+ }
+ }
+
+ finish(6)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosed() = runTest {
+ val channel = Channel<Int>(Channel.RENDEZVOUS)
+ val iterations = 10
+ expect(1)
+ val job = launch {
+ repeat(iterations) {
+ select<Unit> {
+ channel.onReceiveOrClosed { v ->
+ expect(4 + it * 2)
+ assertEquals(it, v.value)
+ }
+ }
+ }
+ }
+
+ expect(2)
+ repeat(iterations) {
+ expect(3 + it * 2)
+ channel.send(it)
+ yield()
+ }
+
+ job.join()
+ finish(3 + iterations * 2)
+ }
+
+ @Test
+ fun testSelectReceiveOrClosedDispatch() = runTest {
+ val c = Channel<Int>(Channel.RENDEZVOUS)
+ expect(1)
+ launch {
+ expect(3)
+ val res = select<String> {
+ c.onReceiveOrClosed { v ->
+ expect(6)
+ assertEquals(42, v.value)
+ yield() // back to main
+ expect(8)
+ "OK"
+ }
+ }
+ expect(9)
+ assertEquals("OK", res)
+ }
+ expect(2)
+ yield() // to launch
+ expect(4)
+ c.send(42) // do not suspend
+ expect(5)
+ yield() // to receive
+ expect(7)
+ yield() // again
+ finish(10)
+ }
+
// only for debugging
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
index cffe6c0..5178907 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
@@ -112,18 +112,15 @@
var sum = 0
var n = 0
whileSelect {
- this@averageInTimeWindow.onReceiveOrNull {
- when (it) {
- null -> {
- // Send leftovers and bail out
- if (n != 0) send(sum / n.toDouble())
- false
- }
- else -> {
- sum += it
- ++n
- true
- }
+ this@averageInTimeWindow.onReceiveOrClosed {
+ if (it.isClosed) {
+ // Send leftovers and bail out
+ if (n != 0) send(sum / n.toDouble())
+ false
+ } else {
+ sum += it.value
+ ++n
+ true
}
}