Fixed hangs during thread termination in IO dispatcher
Fixes #524
Fixes #525
diff --git a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
index 2fbda69..79df560 100644
--- a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
@@ -111,21 +111,19 @@
*
* Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
*/
- private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
+ private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
parkedWorkersStack.loop { top ->
val index = (top and PARKED_INDEX_MASK).toInt()
val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
val updIndex = if (index == oldIndex) {
if (newIndex == 0) {
- parkedWorkersStackNextIndex(workers[oldIndex]!!)
- }
- else {
+ parkedWorkersStackNextIndex(worker)
+ } else {
newIndex
}
} else {
index // no change to index, but update version
}
-
if (updIndex < 0) return@loop // retry
if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
}
@@ -254,7 +252,7 @@
private const val MAX_SPINS = 1000
private const val MAX_YIELDS = MAX_SPINS + 500
- @JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
+ @JvmStatic // Note, that is fits into Int (it is equal to 10^9)
private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt()
@JvmStatic
@@ -824,10 +822,7 @@
* See tryUnpark for state reasoning.
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
*/
- if (!terminationState.compareAndSet(
- ALLOWED,
- TERMINATED
- )) return
+ if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
/*
* At this point this thread is no longer considered as usable for scheduling.
* We need multi-step choreography to reindex workers.
@@ -843,22 +838,25 @@
* 2) Update top of stack if it was pointing to oldIndex and make sure no
* pending push/pop operation that might have already retrieved oldIndex could complete.
*/
- parkedWorkersStackTopUpdate(oldIndex, 0)
+ parkedWorkersStackTopUpdate(this, oldIndex, 0)
/*
- * 3) Move last worker into an index in array that was previously occupied by this worker.
+ * 3) Move last worker into an index in array that was previously occupied by this worker,
+ * if last worker was a different one (sic!).
*/
val lastIndex = decrementCreatedWorkers()
- val lastWorker = workers[lastIndex]!!
- workers[oldIndex] = lastWorker
- lastWorker.indexInArray = oldIndex
- /*
- * Now lastWorker is available at both indices in the array, but it can
- * still be at the stack top on via its lastIndex
- *
- * 4) Update top of stack lastIndex -> oldIndex and make sure no
- * pending push/pop operation that might have already retrieved lastIndex could complete.
- */
- parkedWorkersStackTopUpdate(lastIndex, oldIndex)
+ if (lastIndex != oldIndex) {
+ val lastWorker = workers[lastIndex]!!
+ workers[oldIndex] = lastWorker
+ lastWorker.indexInArray = oldIndex
+ /*
+ * Now lastWorker is available at both indices in the array, but it can
+ * still be at the stack top on via its lastIndex
+ *
+ * 4) Update top of stack lastIndex -> oldIndex and make sure no
+ * pending push/pop operation that might have already retrieved lastIndex could complete.
+ */
+ parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
+ }
/*
* 5) It is safe to clear reference from workers array now.
*/
diff --git a/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt
new file mode 100644
index 0000000..48e2653
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.scheduling.*
+import org.junit.*
+import java.util.*
+import java.util.concurrent.*
+
+class BlockingIOTerminationStressTest : TestBase() {
+ private val baseDispatcher = ExperimentalCoroutineDispatcher(
+ 2, 20,
+ TimeUnit.MILLISECONDS.toNanos(10)
+ )
+ private val ioDispatcher = baseDispatcher.blocking()
+ private val TEST_SECONDS = 3L * stressTestMultiplier
+
+ @After
+ fun tearDown() {
+ baseDispatcher.close()
+ }
+
+ @Test
+ fun testTermination() {
+ val rnd = Random()
+ val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_SECONDS)
+ while (System.currentTimeMillis() < deadline) {
+ Thread.sleep(rnd.nextInt(30).toLong())
+ repeat(rnd.nextInt(5) + 1) {
+ launch(ioDispatcher) {
+ Thread.sleep(rnd.nextInt(5).toLong())
+ }
+ }
+ }
+ }
+}