Use FIFO queue in undispatched event loop
diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
index cdd6f15..896492a 100644
--- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
@@ -9,51 +9,24 @@
import kotlin.jvm.*
@Suppress("PrivatePropertyName")
-@JvmField
-internal val UNDEFINED = Symbol("UNDEFINED")
+private val UNDEFINED = Symbol("UNDEFINED")
@NativeThreadLocal
internal object UndispatchedEventLoop {
data class State(
@JvmField var isActive: Boolean = false,
- @JvmField val threadLocalQueue: ArrayList<Runnable> = ArrayList()
+ @JvmField val threadLocalQueue: ArrayQueue<Runnable> = ArrayQueue()
)
@JvmField
internal val state = CommonThreadLocal { State() }
- fun dispatch(block: Runnable) {
- val state = state.get()
- if (state.isActive) {
- state.threadLocalQueue.add(block)
- return
- }
-
- try {
- state.isActive = true
- block.run()
- while (!state.threadLocalQueue.isEmpty()) {
- val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
- element.run()
- }
- } catch (e: Throwable) {
- /*
- * This exception doesn't happen normally, only if user either submitted throwing runnable
- * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
- */
- state.threadLocalQueue.clear()
- throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
- } finally {
- state.isActive = false
- }
- }
-
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
val state = state.get()
if (state.isActive) {
continuation._state = contState
continuation.resumeMode = mode
- state.threadLocalQueue.add(continuation)
+ state.threadLocalQueue.addLast(continuation)
return
}
@@ -63,7 +36,7 @@
inline fun execute(task: DispatchedTask<*>, block: () -> Unit) {
val state = state.get()
if (state.isActive) {
- state.threadLocalQueue.add(task)
+ state.threadLocalQueue.addLast(task)
return
}
@@ -74,8 +47,8 @@
try {
state.isActive = true
block()
- while (!state.threadLocalQueue.isEmpty()) {
- val element = state.threadLocalQueue.removeAt(state.threadLocalQueue.lastIndex)
+ while (!state.threadLocalQueue.isEmpty) {
+ val element = state.threadLocalQueue.removeFirst()
element.run()
}
} catch (e: Throwable) {
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index 1392614..f32c66a 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -1072,7 +1072,7 @@
*/
internal suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
- while(true) { // lock-free loop on state
+ while (true) { // lock-free loop on state
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt
new file mode 100644
index 0000000..be38183
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+internal class ArrayQueue<T : Any> {
+ public val isEmpty: Boolean get() = head == tail
+
+ private var elements = arrayOfNulls<Any>(16)
+ private var head = 0
+ private var tail = 0
+
+ public fun addLast(element: T) {
+ elements[tail] = element
+ tail = (tail + 1) and elements.size - 1
+ if (tail == head) ensureCapacity()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ public fun removeFirst(): T {
+ require(head != tail) { "Queue is empty" }
+ val element = elements[head]
+ elements[head] = null
+ head = (head + 1) and elements.size - 1
+ return element!! as T
+ }
+
+ public fun clear() {
+ head = 0
+ tail = 0
+ elements = arrayOfNulls(elements.size)
+ }
+
+ private fun ensureCapacity() {
+ val currentSize = elements.size
+ val newCapacity = currentSize shl 1
+ val newElements = arrayOfNulls<Any>(newCapacity)
+ val remaining = elements.size - head
+ arraycopy(elements, head, newElements, 0, remaining)
+ arraycopy(elements, 0, newElements, remaining, head)
+ elements = newElements
+ head = 0
+ tail = currentSize
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
index a32350f..8866057 100644
--- a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
@@ -16,11 +16,11 @@
launch {
expect(4)
launch {
- expect(7)
+ expect(6)
}
launch {
- expect(6)
+ expect(7)
}
expect(5)
}
diff --git a/core/kotlinx-coroutines-core/test/UnconfinedConcurrentTest.kt b/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
similarity index 89%
rename from core/kotlinx-coroutines-core/test/UnconfinedConcurrentTest.kt
rename to core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
index 24111e2..4fe1fd8 100644
--- a/core/kotlinx-coroutines-core/test/UnconfinedConcurrentTest.kt
+++ b/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
@@ -9,9 +9,9 @@
import java.util.concurrent.*
import kotlin.test.*
-class UnconfinedConcurrentTest : TestBase() {
+class UnconfinedConcurrentStressTest : TestBase() {
private val threads = 4
- private val executor = newFixedThreadPoolContext(threads, "UnconfinedConcurrentTest")
+ private val executor = newFixedThreadPoolContext(threads, "UnconfinedConcurrentStressTest")
private val threadLocal = ThreadLocal<Int>()
@After
@@ -21,7 +21,7 @@
@Test(timeout = 10_000L)
fun testConcurrent() = runTest {
- val iterations = 10_000
+ val iterations = 10_000 * stressTestMultiplier
val startBarrier = CyclicBarrier(threads + 1)
val finishLatch = CountDownLatch(threads)
diff --git a/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt b/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
index db40ac3..dc004a8 100644
--- a/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
+++ b/core/kotlinx-coroutines-core/test/test/TestCoroutineContextTest.kt
@@ -137,49 +137,6 @@
}.await()
}
- @Test
- fun testBlockingFunctionWithRunBlocking() = withTestContext(injectedContext) {
- val delay = 1000L
- val expectedValue = 16
-
- val result = runBlocking {
- suspendedBlockingFunction(delay) {
- expectedValue
- }
- }
-
- assertEquals(expectedValue, result)
- assertEquals(delay, now())
- }
-
- @Test
- fun testBlockingFunctionWithAsync() = withTestContext(injectedContext) {
- val delay = 1000L
- val expectedValue = 16
- var now = 0L
-
- val deferred = async {
- suspendedBlockingFunction(delay) {
- expectedValue
- }
- }
-
- now += advanceTimeBy((delay / 4) - 1)
- assertEquals((delay / 4) - 1, now)
- assertEquals(now, now())
- try {
- deferred.getCompleted()
- fail("The Job should not have been completed yet.")
- } catch (e: Exception) {
- // Success.
- }
-
- now += advanceTimeBy(1)
- assertEquals(delay, now())
- assertEquals(now, now())
- assertEquals(expectedValue, deferred.getCompleted())
- }
-
private suspend fun <T> TestCoroutineContext.suspendedBlockingFunction(delay: Long, function: () -> T): T {
delay(delay / 4)
return runBlocking {