Merge branch 'master' into develop
diff --git a/build.gradle b/build.gradle
index ce77c0a..24145df 100644
--- a/build.gradle
+++ b/build.gradle
@@ -52,7 +52,15 @@
kotlin_version = '1.2-SNAPSHOT'
}
+ def name = it.name
repositories {
+ /*
+ * google should be first in the repository list because some of the play services
+ * transitive dependencies was removed from jcenter, thus breaking gradle dependency resolution
+ */
+ if (name == "kotlinx-coroutines-play-services") {
+ google()
+ }
jcenter()
maven { url "https://kotlin.bintray.com/kotlin-eap" }
maven { url "https://kotlin.bintray.com/kotlinx" }
diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
index 32ee51f..d7f7a97 100644
--- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt
@@ -21,26 +21,40 @@
@JvmField
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }
- inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
+ /**
+ * Executes given [block] as part of current event loop, updating related to block [continuation]
+ * mode and state if continuation is not resumed immediately.
+ * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
+ * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
+ */
+ inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
+ doYield: Boolean = false, block: () -> Unit) : Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
+ // If we are yielding and queue is empty, we can bail out as part of fast path
+ if (doYield && eventLoop.queue.isEmpty) {
+ return false
+ }
+
continuation._state = contState
continuation.resumeMode = mode
eventLoop.queue.addLast(continuation)
- return
+ return true
}
runEventLoop(eventLoop, block)
+ return false
}
- fun resumeUndispatched(task: DispatchedTask<*>) {
+ fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
eventLoop.queue.addLast(task)
- return
+ return true
}
runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
+ return false
}
inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
@@ -227,6 +241,11 @@
}
}
+internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
+ UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
+ run()
+ }
+
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
diff --git a/common/kotlinx-coroutines-core-common/src/Yield.kt b/common/kotlinx-coroutines-core-common/src/Yield.kt
index 632dcba..78ab27f 100644
--- a/common/kotlinx-coroutines-core-common/src/Yield.kt
+++ b/common/kotlinx-coroutines-core-common/src/Yield.kt
@@ -19,7 +19,9 @@
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
- if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
+ if (!cont.dispatcher.isDispatchNeeded(context)) {
+ return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
+ }
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/channels/Channel.kt
index 81dc89c..8cf8572 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Channel.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Channel.kt
@@ -210,10 +210,9 @@
* **Note: This is an obsolete api.**
* This function will be replaced with `receiveOrClosed: ReceiveResult<E>` and
* extension `suspend fun <E: Any> ReceiveChannel<E>.receiveOrNull(): E?`
+ * It is obsolete because it does not distinguish closed channel and null elements.
*/
- @ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
- @Deprecated(level = DeprecationLevel.WARNING, message = "This method does not distinguish closed channel and null elements")
public suspend fun receiveOrNull(): E?
/**
diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt
index a6bf8f6..5cfb8e8 100644
--- a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt
+++ b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt
@@ -8,6 +8,7 @@
private var elements = arrayOfNulls<Any>(16)
private var head = 0
private var tail = 0
+ val isEmpty: Boolean get() = head == tail
public fun addLast(element: T) {
elements[tail] = element
diff --git a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
index 8866057..f37c356 100644
--- a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
@@ -54,7 +54,7 @@
}
@Test
- fun enterMultipleTimes() = runTest {
+ fun testEnterMultipleTimes() = runTest {
launch(Unconfined) {
expect(1)
}
@@ -70,5 +70,46 @@
finish(4)
}
+ @Test
+ fun testYield() = runTest {
+ expect(1)
+ launch(Dispatchers.Unconfined) {
+ expect(2)
+ yield()
+ launch {
+ expect(4)
+ }
+ expect(3)
+ yield()
+ expect(5)
+ }.join()
+
+ finish(6)
+ }
+
+ @Test
+ fun testCancellationWihYields() = runTest {
+ expect(1)
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ val job = coroutineContext[Job]!!
+ expect(2)
+ yield()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ expect(4)
+ job.cancel()
+ expect(5)
+ }
+ expect(3)
+
+ try {
+ yield()
+ } finally {
+ expect(6)
+ }
+ }
+
+ finish(7)
+ }
+
class TestException : Throwable()
}
diff --git a/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt b/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
index 4fe1fd8..0308800 100644
--- a/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
+++ b/core/kotlinx-coroutines-core/test/UnconfinedConcurrentStressTest.kt
@@ -19,9 +19,9 @@
executor.close()
}
- @Test(timeout = 10_000L)
+ @Test
fun testConcurrent() = runTest {
- val iterations = 10_000 * stressTestMultiplier
+ val iterations = 1_000 * stressTestMultiplier
val startBarrier = CyclicBarrier(threads + 1)
val finishLatch = CountDownLatch(threads)
diff --git a/integration/kotlinx-coroutines-play-services/build.gradle b/integration/kotlinx-coroutines-play-services/build.gradle
index 44eec3b..51cce3f 100644
--- a/integration/kotlinx-coroutines-play-services/build.gradle
+++ b/integration/kotlinx-coroutines-play-services/build.gradle
@@ -9,10 +9,6 @@
ext.tasks_version = '15.0.1'
-repositories {
- google()
-}
-
def attr = Attribute.of("artifactType", String.class)
configurations {
aar {
diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
index 2181b1c..be5185d 100644
--- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
+++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
@@ -120,9 +120,11 @@
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- handler.postDelayed({
+ val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
- }, timeMillis.coerceAtMost(MAX_DELAY))
+ }
+ handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
+ continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {