Make ReceiveChannel.cancel linearizability-friendly

diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index be18942..30156ea 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -624,22 +624,36 @@
     }
 
     // It needs to be internal to support deprecated cancel(Throwable?) API
-    internal open fun cancelInternal(cause: Throwable?): Boolean =
+    internal fun cancelInternal(cause: Throwable?): Boolean =
         close(cause).also {
-            cleanupSendQueueOnCancel()
+            onCancelIdempotent(it)
         }
 
-    // Note: this function is invoked when channel is already closed
-    protected open fun cleanupSendQueueOnCancel() {
+    /**
+     * Method that is invoked right after [close] in [cancel] sequence.
+     * [wasClosed] is directly mapped to the value returned by [close].
+     */
+    protected open fun onCancelIdempotent(wasClosed: Boolean) {
+        /*
+         * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
+         * has the same rationale.
+         */
         val closed = closedForSend ?: error("Cannot happen")
+        var list = InlineList<Send>()
         while (true) {
-            val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
-            if (send is Closed<*>) {
-                assert { send === closed }
-                return // cleaned
+            val previous = closed.prevNode
+            if (previous is LockFreeLinkedListHead) {
+                break
             }
-            send.resumeSendClosed(closed)
+            assert { previous is Send }
+            if (!previous.remove()) {
+                previous.helpRemove() // make sure remove is complete before continuing
+                continue
+            }
+            // Add to the list only **after** successful removal
+            list += previous as Send
         }
+        list.forEachReversed { it.resumeSendClosed(closed) }
     }
 
     public final override fun iterator(): ChannelIterator<E> = Itr(this)
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
index 7fbf0c3..49fce27 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
@@ -208,15 +208,12 @@
         override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
         override val isBufferFull: Boolean get() = error("Should not be used")
 
-        override fun cancelInternal(cause: Throwable?): Boolean =
-            close(cause).also { closed ->
-                if (closed) broadcastChannel.updateHead(removeSub = this)
-                clearBuffer()
-            }
-
-        private fun clearBuffer() {
-            subLock.withLock {
-                subHead = broadcastChannel.tail
+        override fun onCancelIdempotent(wasClosed: Boolean) {
+            if (wasClosed) {
+                broadcastChannel.updateHead(removeSub = this)
+                subLock.withLock {
+                    subHead = broadcastChannel.tail
+                }
             }
         }
 
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 1e1c0d3..96302dd 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -241,17 +241,19 @@
     }
 
     // Note: this function is invoked when channel is already closed
-    override fun cleanupSendQueueOnCancel() {
-        // clear buffer first
-        lock.withLock {
-            repeat(size) {
-                buffer[head] = 0
-                head = (head + 1) % buffer.size
+    override fun onCancelIdempotent(wasClosed: Boolean) {
+        // clear buffer first, but do not wait for it in helpers
+        if (wasClosed) {
+            lock.withLock {
+                repeat(size) {
+                    buffer[head] = 0
+                    head = (head + 1) % buffer.size
+                }
+                size = 0
             }
-            size = 0
         }
         // then clean all queued senders
-        super.cleanupSendQueueOnCancel()
+        super.onCancelIdempotent(wasClosed)
     }
 
     // ------ debug ------
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index b242639..61b1f7a 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -284,10 +284,12 @@
     private class Subscriber<E>(
         private val broadcastChannel: ConflatedBroadcastChannel<E>
     ) : ConflatedChannel<E>(), ReceiveChannel<E> {
-        override fun cancelInternal(cause: Throwable?): Boolean =
-            close(cause).also { closed ->
-                if (closed) broadcastChannel.closeSubscriber(this)
+
+        override fun onCancelIdempotent(wasClosed: Boolean) {
+            if (wasClosed) {
+                broadcastChannel.closeSubscriber(this)
             }
+        }
 
         public override fun offerInternal(element: E): Any = super.offerInternal(element)
     }