Fixed hangs during thread termination in IO dispatcher

Fixes #524
Fixes #525
diff --git a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
index 2fbda69..79df560 100644
--- a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
@@ -111,21 +111,19 @@
      *
      * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
      */
-    private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
+    private fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
         parkedWorkersStack.loop { top ->
             val index = (top and PARKED_INDEX_MASK).toInt()
             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
             val updIndex = if (index == oldIndex) {
                 if (newIndex == 0) {
-                    parkedWorkersStackNextIndex(workers[oldIndex]!!)
-                }
-                else {
+                    parkedWorkersStackNextIndex(worker)
+                } else {
                     newIndex
                 }
             } else {
                 index // no change to index, but update version
             }
-
             if (updIndex < 0) return@loop // retry
             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
         }
@@ -254,7 +252,7 @@
         private const val MAX_SPINS = 1000
         private const val MAX_YIELDS = MAX_SPINS + 500
         
-        @JvmStatic // Note, that is fits into Int (it is is equal to 10^9)
+        @JvmStatic // Note, that is fits into Int (it is equal to 10^9)
         private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1).toInt()
 
         @JvmStatic
@@ -824,10 +822,7 @@
                  * See tryUnpark for state reasoning.
                  * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
                  */
-                if (!terminationState.compareAndSet(
-                        ALLOWED,
-                        TERMINATED
-                    )) return
+                if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
                 /*
                  * At this point this thread is no longer considered as usable for scheduling.
                  * We need multi-step choreography to reindex workers.
@@ -843,22 +838,25 @@
                  * 2) Update top of stack if it was pointing to oldIndex and make sure no
                  *    pending push/pop operation that might have already retrieved oldIndex could complete.
                  */
-                parkedWorkersStackTopUpdate(oldIndex, 0)
+                parkedWorkersStackTopUpdate(this, oldIndex, 0)
                 /*
-                 * 3) Move last worker into an index in array that was previously occupied by this worker.
+                 * 3) Move last worker into an index in array that was previously occupied by this worker,
+                 *    if last worker was a different one (sic!).
                  */
                 val lastIndex = decrementCreatedWorkers()
-                val lastWorker = workers[lastIndex]!!
-                workers[oldIndex] = lastWorker
-                lastWorker.indexInArray = oldIndex
-                /*
-                 * Now lastWorker is available at both indices in the array, but it can
-                 * still be at the stack top on via its lastIndex
-                 *
-                 * 4) Update top of stack lastIndex -> oldIndex and make sure no
-                 *    pending push/pop operation that might have already retrieved lastIndex could complete.
-                 */
-                parkedWorkersStackTopUpdate(lastIndex, oldIndex)
+                if (lastIndex != oldIndex) {
+                    val lastWorker = workers[lastIndex]!!
+                    workers[oldIndex] = lastWorker
+                    lastWorker.indexInArray = oldIndex
+                    /*
+                     * Now lastWorker is available at both indices in the array, but it can
+                     * still be at the stack top on via its lastIndex
+                     *
+                     * 4) Update top of stack lastIndex -> oldIndex and make sure no
+                     *    pending push/pop operation that might have already retrieved lastIndex could complete.
+                     */
+                    parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
+                }
                 /*
                  * 5) It is safe to clear reference from workers array now.
                  */
diff --git a/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt
new file mode 100644
index 0000000..48e2653
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/scheduling/BlockingIOTerminationStressTest.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.scheduling.*
+import org.junit.*
+import java.util.*
+import java.util.concurrent.*
+
+class BlockingIOTerminationStressTest : TestBase() {
+    private val baseDispatcher = ExperimentalCoroutineDispatcher(
+        2, 20,
+        TimeUnit.MILLISECONDS.toNanos(10)
+    )
+    private val ioDispatcher = baseDispatcher.blocking()
+    private val TEST_SECONDS = 3L * stressTestMultiplier
+
+    @After
+    fun tearDown() {
+        baseDispatcher.close()
+    }
+
+    @Test
+    fun testTermination() {
+        val rnd = Random()
+        val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_SECONDS)
+        while (System.currentTimeMillis() < deadline) {
+            Thread.sleep(rnd.nextInt(30).toLong())
+            repeat(rnd.nextInt(5) + 1) {
+                launch(ioDispatcher) {
+                    Thread.sleep(rnd.nextInt(5).toLong())
+                }
+            }
+        }
+    }
+}