Fixed IllegalStateException in select that concurrently selects, which
was due a problem with Job concurrent cancel and dispose of handle.
The bug was introduced by addition of Cancelling state for a Job.
Original stack trace in select was:
java.lang.IllegalStateException: Check failed.
at kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode.remove(LockFreeLinkedList.kt:236)
at kotlinx.coroutines.experimental.JobSupport.removeNode$kotlinx_coroutines_core(Job.kt:734)
at kotlinx.coroutines.experimental.JobNode.dispose(Job.kt:995)
at kotlinx.coroutines.experimental.selects.SelectBuilderImpl.initCancellability(Select.kt:294)
at kotlinx.coroutines.experimental.selects.SelectBuilderImpl.getResult(Select.kt:273)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 97daf79..ff93585 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -227,13 +227,14 @@
// ------ removeXXX ------
/**
- * Removes this node from the list. Returns `true` when removed successfully.
+ * Removes this node from the list. Returns `true` when removed successfully, or `false` if the node was already
+ * removed or if it was not added to any list in the first place.
*/
public open fun remove(): Boolean {
while (true) { // lock-free loop on next
val next = this.next
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
- check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
+ if (next === this) return false // was not even added
val removed = (next as Node).removed()
if (NEXT.compareAndSet(this, next, removed)) {
// was removed successfully (linearized remove) -- fixup the list
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt
new file mode 100644
index 0000000..648d35a
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/JobDisposeTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.Test
+import kotlin.concurrent.thread
+
+/**
+ * Tests concurrent cancel & dispose of the jobs.
+ */
+class JobDisposeTest: TestBase() {
+ private val TEST_DURATION = 3 * stressTestMultiplier // seconds
+
+ @Volatile
+ private var done = false
+ @Volatile
+ private var job: TestJob? = null
+ @Volatile
+ private var handle: DisposableHandle? = null
+
+ @Volatile
+ private var exception: Throwable? = null
+
+ fun testThread(name: String, block: () -> Unit): Thread =
+ thread(start = false, name = name, block = block).apply {
+ setUncaughtExceptionHandler { t, e ->
+ exception = e
+ println("Exception in ${t.name}: $e")
+ e.printStackTrace()
+ }
+ }
+
+ @Test
+ fun testConcurrentDispose() {
+ // create threads
+ val threads = mutableListOf<Thread>()
+ threads += testThread("creator") {
+ while (!done) {
+ val job = TestJob()
+ val handle = job.invokeOnCancellation { /* nothing */ }
+ this.job = job // post job to cancelling thread
+ this.handle = handle // post handle to concurrent disposer thread
+ handle.dispose() // dispose of handle from this thread (concurrently with other disposer)
+ }
+ }
+ threads += testThread("canceller") {
+ var prevJob: Job? = null
+ while (!done) {
+ val job = this.job ?: continue
+ val result = job.cancel()
+ if (job != prevJob) {
+ check(result) // must have returned true
+ prevJob = job
+ } else
+ check(!result) // must have returned false
+ }
+ }
+ threads += testThread("disposer") {
+ while (!done) {
+ handle?.dispose()
+ }
+ }
+ // start threads
+ threads.forEach { it.start() }
+ // wait
+ for (i in 1..TEST_DURATION) {
+ println("$i: Running")
+ Thread.sleep(1000)
+ if (exception != null) break
+ }
+ // done
+ done = true
+ // join threads
+ threads.forEach { it.join() }
+ // rethrow exception if any
+ exception?.let { throw it }
+ }
+
+ class TestJob : JobSupport(active = true) {
+ // The bug was triggering only with cancelling state
+ override val hasCancellingState: Boolean
+ get() = true
+ }
+}
\ No newline at end of file