Fixed race in a new Job with listeners between start & cancel
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index 0dded2a..5edeeef 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -373,10 +373,10 @@
onStartInternal()
return TRUE
}
- is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- return state.tryMakeActive().also { result ->
- if (result == TRUE) onStartInternal()
- }
+ is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
+ if (!_state.compareAndSet(state, state.list)) return RETRY
+ onStartInternal()
+ return TRUE
}
else -> return FALSE // not a new state
}
@@ -486,13 +486,15 @@
list.addLastIf(node) { this.state === expect }
private fun promoteEmptyToNodeList(state: Empty) {
- // try to promote it to list in new state
- _state.compareAndSet(state, NodeList(state.isActive))
+ // try to promote it to LIST state with the corresponding state
+ val list = NodeList()
+ val update = if (state.isActive) list else InactiveNodeList(list)
+ _state.compareAndSet(state, update)
}
private fun promoteSingleToNodeList(state: JobNode<*>) {
// try to promote it to list (SINGLE+ state)
- state.addOneIfEmpty(NodeList(active = true))
+ state.addOneIfEmpty(NodeList())
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
val list = state.nextNode // either our NodeList or somebody else won the race, updated state
// just attempt converting it to list if state is still the same, then we'll continue lock-free loop
@@ -597,14 +599,13 @@
is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
promoteSingleToNodeList(state)
}
- is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- if (state.isActive) {
- if (tryMakeCancelling(state, state.list, cause)) return true
- } else {
- // cancelling a non-started coroutine makes it immediately cancelled
- if (updateStateCancelled(state, cause))
- return true
- }
+ is NodeList -> { // LIST -- active list of completion handlers
+ if (tryMakeCancelling(state, state.list, cause)) return true
+ }
+ is InactiveNodeList -> { // LIST -- inactive list of completion handlers
+ // cancelling a non-started coroutine makes it immediately cancelled
+ if (updateStateCancelled(state, cause))
+ return true
}
is Finishing -> { // Completing/Cancelling the job, may cancel
if (state.cancelled != null) {
@@ -1079,24 +1080,14 @@
override fun dispose() = (job as JobSupport).removeNode(this)
}
-internal class NodeList(
- active: Boolean
-) : LockFreeLinkedListHead(), Incomplete {
- private val _active = atomic(if (active) 1 else 0)
-
- override val isActive: Boolean get() = _active.value != 0
+internal class NodeList : LockFreeLinkedListHead(), Incomplete {
+ override val isActive: Boolean get() = true
override val list: NodeList get() = this
- fun tryMakeActive(): Int {
- if (_active.value != 0) return FALSE
- if (_active.compareAndSet(0, 1)) return TRUE
- return RETRY
- }
-
- override fun toString(): String = buildString {
- append("List")
- append(if (isActive) "{Active}" else "{New}")
- append("[")
+ fun getString(state: String) = buildString {
+ append("List{")
+ append(state)
+ append("}[")
var first = true
this@NodeList.forEach<JobNode<*>> { node ->
if (first) first = false else append(", ")
@@ -1104,6 +1095,15 @@
}
append("]")
}
+
+ override fun toString(): String = getString("Active")
+}
+
+internal class InactiveNodeList(
+ override val list: NodeList
+) : Incomplete {
+ override val isActive: Boolean get() = false
+ override fun toString(): String = list.getString("New")
}
private class InvokeOnCompletion(
diff --git a/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt b/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt
new file mode 100644
index 0000000..fa1927c
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/JobActivationStressTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.*
+import org.junit.Test
+import java.util.concurrent.*
+import kotlin.test.*
+
+class JobActivationStressTest : TestBase() {
+ private val N_ITERATIONS = 10_000 * stressTestMultiplier
+ private val pool = newFixedThreadPoolContext(3, "JobActivationStressTest")
+
+ @After
+ fun tearDown() {
+ pool.close()
+ }
+
+ /**
+ * Perform concurrent start & cancel of a job with prior installed completion handlers
+ */
+ @Test
+ @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
+ fun testActivation() = runTest {
+ val barrier = CyclicBarrier(3)
+ val scope = CoroutineScope(pool)
+ repeat(N_ITERATIONS) {
+ var wasStarted = false
+ val d = scope.async(start = CoroutineStart.LAZY) {
+ wasStarted = true
+ throw TestException()
+ }
+ // need to add on completion handler
+ val causeHolder = object {
+ var cause: Throwable? = null
+ }
+ // we use synchronization on causeHolder to work around the fact that completion listeners
+ // are invoked after the job is in the final state, so when "d.join()" completes there is
+ // no guarantee that this listener was already invoked
+ d.invokeOnCompletion {
+ synchronized(causeHolder) {
+ causeHolder.cause = it ?: Error("Empty cause")
+ (causeHolder as Object).notifyAll()
+ }
+ }
+ // concurrent cancel
+ val canceller = scope.launch {
+ barrier.await()
+ d.cancel()
+ }
+ // concurrent cancel
+ val starter = scope.launch {
+ barrier.await()
+ d.start()
+ }
+ barrier.await()
+ joinAll(d, canceller, starter)
+ if (wasStarted) {
+ val exception = d.getCompletionExceptionOrNull()
+ assertTrue(exception is TestException, "exception=$exception")
+ val cause = synchronized(causeHolder) {
+ while (causeHolder.cause == null) (causeHolder as Object).wait()
+ causeHolder.cause
+ }
+ assertTrue(cause is TestException, "cause=$cause")
+ }
+ }
+ }
+
+ private class TestException : Exception()
+}
\ No newline at end of file