Do not step into thread-local event-loop on CancellableContinuation#dispatch fast-path
diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
index 896492a..6fc1870 100644
--- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
@@ -33,14 +33,14 @@
runLoop(state, block)
}
- inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
+ fun resumeUndispatched(task: DispatchedTask<*>) {
val state = state.get()
if (state.isActive) {
state.threadLocalQueue.addLast(task)
return
}
- runLoop(state, block)
+ runLoop(state, { task.resume(task.delegate, MODE_UNDISPATCHED) })
}
inline fun runLoop(state: State, block: () -> Unit) {
@@ -226,7 +226,6 @@
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
- var useMode = mode
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
@@ -234,20 +233,21 @@
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
- return // and that's it -- dispatched via fast-path
} else {
- useMode = MODE_UNDISPATCHED
+ UndispatchedEventLoop.resumeUndispatched(this)
}
+ } else {
+ resume(delegate, mode)
}
+}
- UndispatchedEventLoop.execute(this) {
- // slow-path - use delegate
- val state = takeState()
- val exception = getExceptionalResult(state)
- if (exception != null) {
- delegate.resumeWithExceptionMode(exception, useMode)
- } else {
- delegate.resumeMode(getSuccessfulResult(state), useMode)
- }
+internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
+ // slow-path - use delegate
+ val state = takeState()
+ val exception = getExceptionalResult(state)
+ if (exception != null) {
+ delegate.resumeWithExceptionMode(exception, useMode)
+ } else {
+ delegate.resumeMode(getSuccessfulResult(state), useMode)
}
}
diff --git a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
index 6f4249c..41ed102 100644
--- a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
+++ b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
@@ -103,4 +103,18 @@
finish(4)
}
}
+
+ @Test
+ fun testNestedRunBlocking() = runBlocking {
+ delay(100)
+ val value = runBlocking {
+ delay(100)
+ runBlocking {
+ delay(100)
+ 1
+ }
+ }
+
+ assertEquals(1, value)
+ }
}
diff --git a/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt b/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
index dc004a8..cf4fcb4 100644
--- a/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
+++ b/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
@@ -137,6 +137,44 @@
}.await()
}
+ @Test
+ fun testBlockingFunctionWithRunBlocking() = withTestContext(injectedContext) {
+ val delay = 1000L
+ val expectedValue = 16
+ val result = runBlocking {
+ suspendedBlockingFunction(delay) {
+ expectedValue
+ }
+ }
+ assertEquals(expectedValue, result)
+ assertEquals(delay, now())
+ }
+
+ @Test
+ fun testBlockingFunctionWithAsync() = withTestContext(injectedContext) {
+ val delay = 1000L
+ val expectedValue = 16
+ var now = 0L
+ val deferred = async {
+ suspendedBlockingFunction(delay) {
+ expectedValue
+ }
+ }
+ now += advanceTimeBy((delay / 4) - 1)
+ assertEquals((delay / 4) - 1, now)
+ assertEquals(now, now())
+ try {
+ deferred.getCompleted()
+ fail("The Job should not have been completed yet.")
+ } catch (e: Exception) {
+ // Success.
+ }
+ now += advanceTimeBy(1)
+ assertEquals(delay, now())
+ assertEquals(now, now())
+ assertEquals(expectedValue, deferred.getCompleted())
+ }
+
private suspend fun <T> TestCoroutineContext.suspendedBlockingFunction(delay: Long, function: () -> T): T {
delay(delay / 4)
return runBlocking {