Move undispatched thread-local event loop to dispatched continuation, execute all undispatched tasks in that loop to avoid SOE in user-supplied unconfined dispatchers and 'immediate' main dispatchers.
diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
index a861d4d..cdd6f15 100644
--- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
@@ -9,13 +9,95 @@
import kotlin.jvm.*
@Suppress("PrivatePropertyName")
-private val UNDEFINED = Symbol("UNDEFINED")
+@JvmField
+internal val UNDEFINED = Symbol("UNDEFINED")
+
+@NativeThreadLocal
+internal object UndispatchedEventLoop {
+ data class State(
+ @JvmField var isActive: Boolean = false,
+ @JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList()
+ )
+
+ @JvmField
+ internal val state = CommonThreadLocal { State() }
+
+ fun dispatch(block: Runnable) {
+ val state = state.get()
+ if (state.isActive) {
+ state.threadLocalQueue.add(block)
+ return
+ }
+
+ try {
+ state.isActive = true
+ block.run()
+ while (!state.threadLocalQueue.isEmpty()) {
+ val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
+ element.run()
+ }
+ } catch (e: Throwable) {
+ /*
+ * This exception doesn't happen normally, only if user either submitted throwing runnable
+ * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
+ */
+ state.threadLocalQueue.clear()
+ throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
+ } finally {
+ state.isActive = false
+ }
+ }
+
+ inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
+ val state = state.get()
+ if (state.isActive) {
+ continuation._state = contState
+ continuation.resumeMode = mode
+ state.threadLocalQueue.add(continuation)
+ return
+ }
+
+ runLoop(state, block)
+ }
+
+ inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
+ val state = state.get()
+ if (state.isActive) {
+ state.threadLocalQueue.add(task)
+ return
+ }
+
+ runLoop(state, block)
+ }
+
+ inline fun runLoop(state: State, block: () -> Unit) {
+ try {
+ state.isActive = true
+ block()
+ while (!state.threadLocalQueue.isEmpty()) {
+ val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
+ element.run()
+ }
+ } catch (e: Throwable) {
+ /*
+ * This exception doesn't happen normally, only if user either submitted throwing runnable
+ * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
+ */
+ state.threadLocalQueue.clear()
+ throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
+ } finally {
+ state.isActive = false
+ }
+ }
+}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
- private var _state: Any? = UNDEFINED
+ @JvmField
+ @Suppress("PropertyName")
+ internal var _state: Any? = UNDEFINED
public override var resumeMode: Int = 0
override fun takeState(): Any? {
@@ -30,25 +112,31 @@
override fun resumeWith(result: Result<T>) {
val context = continuation.context
+ val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
- _state = result.toState()
+ _state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
- resumeUndispatchedWith(result)
+ UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
+ withCoroutineContext(this.context) {
+ continuation.resumeWith(result)
+ }
+ }
}
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
- val context = continuation.context
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
- if (!resumeCancelled()) {
- resumeUndispatched(value)
+ UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
+ if (!resumeCancelled()) {
+ resumeUndispatched(value)
+ }
}
}
}
@@ -56,13 +144,16 @@
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellableWithException(exception: Throwable) {
val context = continuation.context
+ val state = CompletedExceptionally(exception)
if (dispatcher.isDispatchNeeded(context)) {
_state = CompletedExceptionally(exception)
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
- if (!resumeCancelled()) {
- resumeUndispatchedWithException(exception)
+ UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
+ if (!resumeCancelled()) {
+ resumeUndispatchedWithException(exception)
+ }
}
}
}
@@ -79,13 +170,6 @@
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
- inline fun resumeUndispatchedWith(result: Result<T>) {
- withCoroutineContext(context) {
- continuation.resumeWith(result)
- }
- }
-
- @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatched(value: T) {
withCoroutineContext(context) {
continuation.resume(value)
@@ -182,12 +266,15 @@
useMode = MODE_UNDISPATCHED
}
}
- // slow-path - use delegate
- val state = takeState()
- val exception = getExceptionalResult(state)
- if (exception != null) {
- delegate.resumeWithExceptionMode(exception, useMode)
- } else {
- delegate.resumeMode(getSuccessfulResult(state), useMode)
+
+ UndispatchedEventLoop.execute(this) {
+ // slow-path - use delegate
+ val state = takeState()
+ val exception = getExceptionalResult(state)
+ if (exception != null) {
+ delegate.resumeWithExceptionMode(exception, useMode)
+ } else {
+ delegate.resumeMode(getSuccessfulResult(state), useMode)
+ }
}
}
diff --git a/common/kotlinx-coroutines-core-common/src/Unconfined.kt b/common/kotlinx-coroutines-core-common/src/Unconfined.kt
index bf4f75c..83e27a5 100644
--- a/common/kotlinx-coroutines-core-common/src/Unconfined.kt
+++ b/common/kotlinx-coroutines-core-common/src/Unconfined.kt
@@ -4,44 +4,13 @@
package kotlinx.coroutines
-import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
-import kotlin.jvm.*
/**
* A coroutine dispatcher that is not confined to any specific thread.
*/
-@NativeThreadLocal
internal object Unconfined : CoroutineDispatcher() {
- private data class State(@JvmField var isActive: Boolean = false,
- @JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList())
- private val state = CommonThreadLocal { State() }
-
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- // Stack-based event loop on top of thread-local arraylist
- val state = state.get()
- if (state.isActive) {
- state.threadLocalQueue.add(block)
- return
- }
-
- try {
- state.isActive = true
- block.run()
- while (!state.threadLocalQueue.isEmpty()) {
- val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
- element.run()
- }
- } catch (e: Throwable) {
- /*
- * This exception doesn't happen normally, only if user either submitted throwing runnable
- * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
- */
- state.threadLocalQueue.clear()
- throw DispatchException("Unexpected exception in Unconfined loop, clearing pending tasks", e)
- } finally {
- state.isActive = false
- }
- }
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
+ override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
override fun toString(): String = "Unconfined"
}