Fixes linearizability of Channel.close in advanced receive+send case
We cannot resume closed receives until all receivers are removed from the list.
Consider channel state: head -> [receive_1] -> [receive_2] -> head
- T1 called receive_2, and will call send() when it's receive call resumes
- T2 calls close()
Now if T2's close resumes T1's receive_2 then it's receive gets
"closed for receive" exception, but its subsequent attempt to send
successfully rendezvous with receive_1, producing non-linearizable
execution.
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index a3be3ba..b5dfd95 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -287,33 +287,49 @@
private fun helpClose(closed: Closed<*>) {
/*
* It's important to traverse list from right to left to avoid races with sender.
- * Consider channel state
- * head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
- * T1 invokes receive()
- * T2 invokes close()
- * T3 invokes close() + send(value)
+ * Consider channel state: head -> [receive_1] -> [receive_2] -> head
+ * - T1 calls receive()
+ * - T2 calls close()
+ * - T3 calls close() + send(value)
*
* If both will traverse list from left to right, following non-linearizable history is possible:
* [close -> false], [send -> transferred 'value' to receiver]
+ *
+ * Another problem with linearizability of close is that we cannot resume closed receives until all
+ * receivers are removed from the list.
+ * Consider channel state: head -> [receive_1] -> [receive_2] -> head
+ * - T1 called receive_2, and will call send() when it's receive call resumes
+ * - T2 calls close()
+ *
+ * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
+ * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
*/
+ var closedNode: Receive<E>? = null // used when one node was closed to avoid extra memory allocation
+ var closedList: ArrayList<Receive<E>>? = null // used when more nodes were closed
while (true) {
- val previous = closed.prevNode
- // Channel is empty or has no receivers
- if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
- break
- }
-
+ // Break when channel is empty or has no receivers
+ @Suppress("UNCHECKED_CAST")
+ val previous = closed.prevNode as? Receive<E> ?: break
if (!previous.remove()) {
// failed to remove the node (due to race) -- retry finding non-removed prevNode
// NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
previous.helpRemove() // make sure remove is complete before continuing
continue
}
-
- @Suppress("UNCHECKED_CAST")
- previous as Receive<E> // type assertion
- previous.resumeReceiveClosed(closed)
+ // add removed nodes to a separate list
+ if (closedNode == null) {
+ closedNode = previous
+ } else {
+ val list = closedList ?: ArrayList<Receive<E>>().also { closedList = it }
+ list += previous
+ }
}
+ // now notify all removed nodes that the channel was closed
+ if (closedNode != null) {
+ closedNode.resumeReceiveClosed(closed)
+ closedList?.forEach { it.resumeReceiveClosed(closed) }
+ }
+ // and do other post-processing
onClosedIdempotent(closed)
}