Make ReceiveChannel.cancel linearizability-friendly
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index be18942..30156ea 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -624,22 +624,36 @@
}
// It needs to be internal to support deprecated cancel(Throwable?) API
- internal open fun cancelInternal(cause: Throwable?): Boolean =
+ internal fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also {
- cleanupSendQueueOnCancel()
+ onCancelIdempotent(it)
}
- // Note: this function is invoked when channel is already closed
- protected open fun cleanupSendQueueOnCancel() {
+ /**
+ * Method that is invoked right after [close] in [cancel] sequence.
+ * [wasClosed] is directly mapped to the value returned by [close].
+ */
+ protected open fun onCancelIdempotent(wasClosed: Boolean) {
+ /*
+ * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
+ * has the same rationale.
+ */
val closed = closedForSend ?: error("Cannot happen")
+ var list = InlineList<Send>()
while (true) {
- val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
- if (send is Closed<*>) {
- assert { send === closed }
- return // cleaned
+ val previous = closed.prevNode
+ if (previous is LockFreeLinkedListHead) {
+ break
}
- send.resumeSendClosed(closed)
+ assert { previous is Send }
+ if (!previous.remove()) {
+ previous.helpRemove() // make sure remove is complete before continuing
+ continue
+ }
+ // Add to the list only **after** successful removal
+ list += previous as Send
}
+ list.forEachReversed { it.resumeSendClosed(closed) }
}
public final override fun iterator(): ChannelIterator<E> = Itr(this)
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
index 7fbf0c3..49fce27 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
@@ -208,15 +208,12 @@
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
override val isBufferFull: Boolean get() = error("Should not be used")
- override fun cancelInternal(cause: Throwable?): Boolean =
- close(cause).also { closed ->
- if (closed) broadcastChannel.updateHead(removeSub = this)
- clearBuffer()
- }
-
- private fun clearBuffer() {
- subLock.withLock {
- subHead = broadcastChannel.tail
+ override fun onCancelIdempotent(wasClosed: Boolean) {
+ if (wasClosed) {
+ broadcastChannel.updateHead(removeSub = this)
+ subLock.withLock {
+ subHead = broadcastChannel.tail
+ }
}
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 1e1c0d3..96302dd 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -241,17 +241,19 @@
}
// Note: this function is invoked when channel is already closed
- override fun cleanupSendQueueOnCancel() {
- // clear buffer first
- lock.withLock {
- repeat(size) {
- buffer[head] = 0
- head = (head + 1) % buffer.size
+ override fun onCancelIdempotent(wasClosed: Boolean) {
+ // clear buffer first, but do not wait for it in helpers
+ if (wasClosed) {
+ lock.withLock {
+ repeat(size) {
+ buffer[head] = 0
+ head = (head + 1) % buffer.size
+ }
+ size = 0
}
- size = 0
}
// then clean all queued senders
- super.cleanupSendQueueOnCancel()
+ super.onCancelIdempotent(wasClosed)
}
// ------ debug ------
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index b242639..61b1f7a 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -284,10 +284,12 @@
private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), ReceiveChannel<E> {
- override fun cancelInternal(cause: Throwable?): Boolean =
- close(cause).also { closed ->
- if (closed) broadcastChannel.closeSubscriber(this)
+
+ override fun onCancelIdempotent(wasClosed: Boolean) {
+ if (wasClosed) {
+ broadcastChannel.closeSubscriber(this)
}
+ }
public override fun offerInternal(element: E): Any = super.offerInternal(element)
}