Restore the logic of having at least two CPU workers when corePoolSize > 1
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 08e5bf5..6aef8c8 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -366,9 +366,16 @@
val created = createdWorkers(state)
val blocking = blockingWorkers(state)
val cpuWorkers = created - blocking
- // If most of created workers are blocking, we should create one more thread to handle non-blocking work
- if (cpuWorkers < corePoolSize && createNewWorker()) {
- return
+ /*
+ * We check how many threads are there to handle non-blocking work,
+ * and create one more if we have not enough of them.
+ */
+ if (cpuWorkers < corePoolSize) {
+ val newCpuWorkers = createNewWorker()
+ // If we've created the first cpu worker and corePoolSize > 1 then create
+ // one more (second) cpu worker, so that stealing between them is operational
+ if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
+ if (newCpuWorkers > 0) return
}
// Try unpark again in case there was race between permit release and parking
tryUnpark()
@@ -418,7 +425,11 @@
}
}
- private fun createNewWorker(): Boolean {
+ /*
+ * Returns the number of CPU workers after this function (including new worker) or
+ * 0 if no worker was created.
+ */
+ private fun createNewWorker(): Int {
synchronized(workers) {
// for test purposes make sure we're not trying to resurrect terminated scheduler
require(!isTerminated.value) { "This scheduler was terminated "}
@@ -427,14 +438,14 @@
val blocking = blockingWorkers(state)
val cpuWorkers = created - blocking
// Double check for overprovision
- if (cpuWorkers >= corePoolSize) return false
- if (created >= maxPoolSize || cpuPermits.availablePermits() == 0) return false
+ if (cpuWorkers >= corePoolSize) return 0
+ if (created >= maxPoolSize || cpuPermits.availablePermits() == 0) return 0
// start & register new worker
val newIndex = incrementCreatedWorkers()
require(newIndex > 0 && workers[newIndex] == null)
val worker = Worker(newIndex).apply { start() }
workers[newIndex] = worker
- return true
+ return cpuWorkers + 1
}
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
index 1dd59c7..188c60e 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
@@ -90,6 +90,8 @@
return _dispatcher!!.blocking(parallelism) + handler
}
+ fun initialPoolSize() = corePoolSize.coerceAtMost(2)
+
@After
fun after() {
runBlocking {