Properly cancel ChannelCoroutine when the channel was closed or cancelled (#2507)
Fixes #2506
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index bb7feef..9721583 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -642,7 +642,13 @@
cancelInternal(cause)
final override fun cancel(cause: CancellationException?) {
- if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
+ /*
+ * Do not create an exception if channel is already cancelled.
+ * Channel is closed for receive when either it is cancelled (then we are free to bail out)
+ * or was closed and elements were received.
+ * Then `onCancelIdempotent` does nothing for all implementations.
+ */
+ if (isClosedForReceive) return
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
index 9ceb77d..b2b257d 100644
--- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -26,7 +26,7 @@
}
final override fun cancel(cause: CancellationException?) {
- if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
+ if (isCancelled) return // Do not create an exception if the coroutine (-> the channel) is already cancelled
cancelInternal(cause ?: defaultCancellationException())
}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
index 601c238..5513dab 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt
@@ -1,3 +1,7 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
@@ -115,4 +119,4 @@
check(!_cancelled.getAndSet(true)) { "Already cancelled" }
}
}
-}
\ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 6ddde00..194504e 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -96,6 +96,27 @@
}
@Test
+ fun testCancelWhenTheChannelIsClosed() = runTest {
+ val channel = produce<Int> {
+ send(1)
+ close()
+ expect(2)
+ launch {
+ expect(3)
+ hang { expect(5) }
+ }
+ }
+
+ expect(1)
+ channel.receive()
+ yield()
+ expect(4)
+ channel.cancel()
+ (channel as Job).join()
+ finish(6)
+ }
+
+ @Test
fun testAwaitConsumerCancellation() = runTest {
val parent = Job()
val channel = produce<Int>(parent) {
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index b115150..31a929b 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
@@ -194,4 +194,17 @@
assertEquals(listOf(1), flow.toList())
finish(3)
}
+
+ @Test
+ fun testCancelledOnCompletion() = runTest {
+ val myFlow = callbackFlow<Any> {
+ expect(2)
+ close()
+ hang { expect(3) }
+ }
+
+ expect(1)
+ myFlow.collect()
+ finish(4)
+ }
}