ConflatedChannel with lock to protect the one-element buffer, double-linked list used for suspending receivers only.
diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle
index d4b2b89..8323214 100644
--- a/kotlinx-coroutines-core/build.gradle
+++ b/kotlinx-coroutines-core/build.gradle
@@ -107,7 +107,7 @@
testClassesDirs = files { jvmTest.testClassesDirs }
executable = "$System.env.JDK_16/bin/java"
exclude '**/*LFStressTest.*' // lock-freedom tests use LockFreedomTestEnvironment which needs JDK8
- exclude '**/*LCStressTest.*' // lic-check tests use LinChecker which needs JDK8
+ exclude '**/*LCStressTest.*' // lin-check tests use LinChecker which needs JDK8
exclude '**/exceptions/**' // exceptions tests check suppressed exception which needs JDK8
exclude '**/ExceptionsGuideTest.*'
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
index c04ccc4..ed0f370 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
@@ -1,11 +1,9 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
package kotlinx.coroutines.channels
-import kotlinx.coroutines.selects.*
+import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.selects.*
+import kotlin.native.concurrent.SharedImmutable
/**
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
@@ -15,80 +13,124 @@
* Sender to this channel never suspends and [offer] always returns `true`.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
- *
- * This implementation is fully lock-free.
*/
internal open class ConflatedChannel<E> : AbstractChannel<E>() {
- protected final override val isBufferAlwaysEmpty: Boolean get() = true
- protected final override val isBufferEmpty: Boolean get() = true
+ protected final override val isBufferAlwaysEmpty: Boolean get() = false
+ protected final override val isBufferEmpty: Boolean get() = value === EMPTY
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = false
- override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
- @Suppress("UNCHECKED_CAST")
- (closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
- conflatePreviousSendBuffered(lastBuffered)
- }
+ override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
+
+ private val lock = ReentrantLock()
+
+ private var value: Any? = EMPTY
+
+ private companion object {
+ @SharedImmutable
+ private val EMPTY = Symbol("EMPTY")
}
- /**
- * Queues conflated element, returns null on success or
- * returns node reference if it was already closed or is waiting for receive.
- */
- private fun sendConflated(element: E): ReceiveOrClosed<*>? {
- val node = SendBuffered(element)
- queue.addLastIfPrev(node) { prev ->
- if (prev is ReceiveOrClosed<*>) return@sendConflated prev
- true
- }
- conflatePreviousSendBuffered(node)
- return null
- }
-
- private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
- // Conflate all previous SendBuffered, helping other sends to conflate
- var prev = node.prevNode
- while (prev is SendBuffered<*>) {
- if (!prev.remove()) {
- prev.helpRemove()
- }
- prev = prev.prevNode
- }
- }
-
- // result is always `OFFER_SUCCESS | Closed`
+ // result is `OFFER_SUCCESS | Closed`
protected override fun offerInternal(element: E): Any {
- while (true) {
- val result = super.offerInternal(element)
- when {
- result === OFFER_SUCCESS -> return OFFER_SUCCESS
- result === OFFER_FAILED -> { // try to buffer
- when (val sendResult = sendConflated(element)) {
- null -> return OFFER_SUCCESS
- is Closed<*> -> return sendResult
+ var receive: ReceiveOrClosed<E>? = null
+ lock.withLock {
+ closedForSend?.let { return it }
+ // if there is no element written in buffer
+ if (value === EMPTY) {
+ // check for receivers that were waiting on the empty buffer
+ loop@ while(true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+ if (receive is Closed) {
+ return receive!!
}
- // otherwise there was receiver in queue, retry super.offerInternal
+ val token = receive!!.tryResumeReceive(element, null)
+ if (token != null) {
+ assert { token === RESUME_TOKEN }
+ return@withLock
+ }
}
- result is Closed<*> -> return result
- else -> error("Invalid offerInternal result $result")
}
+ value = element
+ return OFFER_SUCCESS
}
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(element)
+ return receive!!.offerResult
}
- // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
- while (true) {
- val result = if (hasReceiveOrClosed)
- super.offerSelectInternal(element, select) else
- (select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
- when {
- result === ALREADY_SELECTED -> return ALREADY_SELECTED
- result === OFFER_SUCCESS -> return OFFER_SUCCESS
- result === OFFER_FAILED -> {} // retry
- result === RETRY_ATOMIC -> {} // retry
- result is Closed<*> -> return result
- else -> error("Invalid result $result")
+ var receive: ReceiveOrClosed<E>? = null
+ lock.withLock {
+ closedForSend?.let { return it }
+ if (value === EMPTY) {
+ loop@ while(true) {
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ when {
+ failure == null -> { // offered successfully
+ receive = offerOp.result
+ return@withLock
+ }
+ failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+ failure === RETRY_ATOMIC -> {} // retry
+ failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ }
+ }
+ }
+ // try to select sending this element to buffer
+ if (!select.trySelect()) {
+ return ALREADY_SELECTED
+ }
+ value = element
+ return OFFER_SUCCESS
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(element)
+ return receive!!.offerResult
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ protected override fun pollInternal(): Any? {
+ var result: Any? = null
+ lock.withLock {
+ if (value === EMPTY) return closedForSend ?: POLL_FAILED
+ result = value
+ value = EMPTY
+ }
+ return result
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ var result: Any? = null
+ lock.withLock {
+ if (value === EMPTY) return closedForSend ?: POLL_FAILED
+ if (!select.trySelect())
+ return ALREADY_SELECTED
+ result = value
+ value = EMPTY
+ }
+ return result
+ }
+
+ protected override fun onCancelIdempotent(wasClosed: Boolean) {
+ if (wasClosed) {
+ lock.withLock {
+ value = EMPTY
}
}
+ super.onCancelIdempotent(wasClosed)
}
-}
+
+ override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
+ super.enqueueReceiveInternal(receive)
+ }
+
+ // ------ debug ------
+
+ override val bufferDebugString: String
+ get() = "(value=$value)"
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
index 75e34e5..256ef62 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
@@ -41,14 +41,6 @@
checkAllReceived()
}
- @Test
- fun testConflatedLockFreedom() {
- // This test does not really verify that all sent elements were received
- // and checks only LF property
- channel = Channel(Channel.CONFLATED)
- performLockFreedomTest()
- }
-
private fun performLockFreedomTest() {
env.onCompletion {
// We must cancel the channel to abort both senders & receivers
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt
index 625c620..8836fdc 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt
@@ -17,7 +17,6 @@
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*
-
class RendezvousChannelLCStressTest : ChannelLCStressTestBase(
c = Channel(RENDEZVOUS),
sequentialSpecification = SequentialRendezvousChannel::class.java
@@ -48,7 +47,6 @@
)
class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED)
-
@Param.Params(
Param(name = "value", gen = IntGen::class, conf = "1:5"),
Param(name = "closeToken", gen = IntGen::class, conf = "1:3")
@@ -105,10 +103,10 @@
// @Operation
fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token))
-// @Operation
+ // @Operation
fun isClosedForReceive() = c.isClosedForReceive
-// @Operation
+ // @Operation
fun isClosedForSend() = c.isClosedForSend
// TODO: this operation should be (and can be!) linearizable, but is not