Execute EventLoop#invokeOnTimeout in DefaultDispatcher to allow busy-wait loops
Fixes #479
diff --git a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt
index 64a9a88..ab2b0bd 100644
--- a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt
+++ b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt
@@ -38,6 +38,17 @@
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
}
+ /**
+ * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
+ * ```
+ * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
+ * ```
+ *
+ * Livelock is possible only if runBlocking is called on [DefaultDispatcher], but it's not exposed as public API
+ */
+ override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
+ DelayedRunnableTask(time, unit, block).also { schedule(it) }
+
override fun run() {
timeSource.registerTimeLoopThread()
try {
diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt
index 6a88992..500179b4 100644
--- a/core/kotlinx-coroutines-core/src/EventLoop.kt
+++ b/core/kotlinx-coroutines-core/src/EventLoop.kt
@@ -9,7 +9,6 @@
import kotlinx.coroutines.experimental.timeunit.*
import java.util.concurrent.locks.*
import kotlin.coroutines.experimental.*
-import kotlin.jvm.*
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -111,9 +110,6 @@
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
schedule(DelayedResumeTask(time, unit, continuation))
- override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
- DelayedRunnableTask(time, unit, block).also { schedule(it) }
-
override fun processNextEvent(): Long {
if (!isCorrectThread()) return Long.MAX_VALUE
// queue all delayed tasks that are due to be executed
@@ -141,8 +137,9 @@
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
- } else
+ } else {
DefaultExecutor.execute(task)
+ }
}
@Suppress("UNCHECKED_CAST")
@@ -266,23 +263,26 @@
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
- fun rescheduleOnShutdown() = synchronized(this) {
+ @Synchronized
+ fun rescheduleOnShutdown() {
if (state != DELAYED) return
if (_delayed.value!!.remove(this)) {
state = RESCHEDULED
DefaultExecutor.schedule(this)
- } else
- state = REMOVED
- }
-
- final override fun dispose() = synchronized(this) {
- when (state) {
- DELAYED -> _delayed.value?.remove(this)
- RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
- else -> return
- }
+ } else {
state = REMOVED
}
+ }
+
+ @Synchronized
+ final override fun dispose() {
+ when (state) {
+ DELAYED -> _delayed.value?.remove(this)
+ RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
+ else -> return
+ }
+ state = REMOVED
+ }
override fun toString(): String = "Delayed[nanos=$nanoTime]"
}
@@ -302,7 +302,8 @@
}
}
- private inner class DelayedRunnableTask(
+ // Cannot be moved to DefaultExecutor due to BE bug
+ internal inner class DelayedRunnableTask(
time: Long, timeUnit: TimeUnit,
private val block: Runnable
) : DelayedTask(time, timeUnit) {
diff --git a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
index 2df50e9..f2c1a39 100644
--- a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
+++ b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
@@ -8,6 +8,19 @@
import kotlin.test.*
class RunBlockingTest : TestBase() {
+
+ @Test
+ fun testWithTimeoutBusyWait() = runBlocking {
+ val value = withTimeoutOrNull(10) {
+ while (isActive) {
+ // Busy wait
+ }
+ "value"
+ }
+
+ assertEquals("value", value)
+ }
+
@Test
fun testPrivateEventLoop() {
expect(1)