Allow to shutdown executor from withing worker thread
Fixed #612
diff --git a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
index ea23ada..37e1932 100644
--- a/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
@@ -300,6 +300,14 @@
fun shutdown(timeout: Long) {
// atomically set termination flag which is checked when workers are added or removed
if (!isTerminated.compareAndSet(false, true)) return
+
+ /*
+ * Shutdown current thread. Note that shutdown is testing utility,
+ * so we don't do anything special to properly verify that no tasks are submitted after close()
+ */
+ val thread = Thread.currentThread()
+ (thread as? Worker)?.tryReleaseCpu(WorkerState.TERMINATED)
+
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
val created = synchronized(workers) { createdWorkers }
for (i in 1..created) {
@@ -653,7 +661,7 @@
* Releases CPU token if worker has any and changes state to [newState]
* @return whether worker had CPU token
*/
- private fun tryReleaseCpu(newState: WorkerState): Boolean {
+ internal fun tryReleaseCpu(newState: WorkerState): Boolean {
val previousState = state
val hadCpu = previousState == WorkerState.CPU_ACQUIRED
if (hadCpu) cpuPermits.release()
diff --git a/core/kotlinx-coroutines-core/test/scheduling/CoroutineSchedulerTest.kt b/core/kotlinx-coroutines-core/test/scheduling/CoroutineSchedulerTest.kt
index de99f2c..7b39e69 100644
--- a/core/kotlinx-coroutines-core/test/scheduling/CoroutineSchedulerTest.kt
+++ b/core/kotlinx-coroutines-core/test/scheduling/CoroutineSchedulerTest.kt
@@ -5,8 +5,10 @@
package kotlinx.coroutines.experimental.scheduling
import kotlinx.coroutines.experimental.TestBase
-import org.junit.Test
-import java.util.concurrent.CountDownLatch
+import org.junit.*
+import java.lang.Runnable
+import java.util.concurrent.*
+import kotlin.coroutines.experimental.*
class CoroutineSchedulerTest : TestBase() {
@@ -115,6 +117,16 @@
ExperimentalCoroutineDispatcher(4, 1)
}
+ @Test
+ fun testSelfClose() {
+ val dispatcher = ExperimentalCoroutineDispatcher(1, 1)
+ val latch = CountDownLatch(1)
+ dispatcher.dispatch(EmptyCoroutineContext, Runnable {
+ dispatcher.close(); latch.countDown()
+ })
+ latch.await()
+ }
+
private fun testUniformDistribution(worker: CoroutineScheduler.Worker, bound: Int) {
val result = IntArray(bound)
val iterations = 10_000_000