Avoid termination races by checking blocking tasks quiescence
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 8803f8a..10a7966 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -636,6 +636,8 @@
parkedWorkersStack.push(this)
}
+ if (!blockingQuiescence()) return
+
terminationState.value = ALLOWED
val time = System.nanoTime()
LockSupport.parkNanos(IDLE_WORKER_KEEP_ALIVE_NS)
@@ -649,14 +651,6 @@
* Stops execution of current thread and removes it from [createdWorkers]
*/
private fun terminateWorker() {
- // Last ditch polling: try to find blocking task before termination
- val task = globalWorkQueue.pollBlockingMode()
- if (task != null) {
- localQueue.add(task, globalWorkQueue)
- return
- }
-
-
synchronized(workers) {
// Someone else terminated, bail out
if (createdWorkers <= corePoolSize) {
@@ -671,6 +665,9 @@
return
}
+ // Last ditch polling: try to find blocking task before termination
+ if (!blockingQuiescence()) return
+
/*
* See tryUnpark for state reasoning.
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate
@@ -690,6 +687,18 @@
state = WorkerState.FINISHED
}
+ /**
+ * Method checks whether new blocking tasks arrived to pool when worker decided
+ * it can go to deep park/termination and puts recently arrived task to its local queue
+ */
+ private fun blockingQuiescence(): Boolean {
+ globalWorkQueue.pollBlockingMode()?.let {
+ localQueue.add(it, globalWorkQueue)
+ return false
+ }
+ return true
+ }
+
private fun idleReset(mode: TaskMode) {
if (state == WorkerState.PARKING) {
assert(mode == TaskMode.PROBABLY_BLOCKING)