EventLoop implements Delay
* Predictable single-threaded scheduling in runBlocking
* withTimeout used from runBlocking will release memory (important for tests)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 4db38bb..4aed46f 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -81,11 +81,11 @@
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
- val privateEventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
- val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
- val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
+ val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
+ val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
+ val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
coroutine.initParentJob(context[Job])
- privateEventLoop?.initParentJob(coroutine)
+ eventLoop?.initParentJob(coroutine)
block.startCoroutine(coroutine, coroutine)
return coroutine.joinBlocking()
}
@@ -104,7 +104,7 @@
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
- val block: suspend CoroutineScope.() -> Unit
+ private val block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
override fun onStart() {
block.startCoroutine(this, this)
@@ -120,11 +120,15 @@
private class BlockingCoroutine<T>(
override val parentContext: CoroutineContext,
- val blockedThread: Thread,
- val hasPrivateEventLoop: Boolean
+ private val blockedThread: Thread,
+ private val privateEventLoop: Boolean
) : AbstractCoroutine<T>(active = true) {
val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
+ init {
+ if (privateEventLoop) require(eventLoop is EventLoopImpl)
+ }
+
override fun afterCompletion(state: Any?, mode: Int) {
if (Thread.currentThread() != blockedThread)
LockSupport.unpark(blockedThread)
@@ -132,15 +136,15 @@
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
- while (isActive) {
+ while (true) {
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
- if (eventLoop == null || !eventLoop.processNextEvent())
- LockSupport.park(this)
+ val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
+ // note: process next even may look unpark flag, so check !isActive before parking
+ if (!isActive) break
+ LockSupport.parkNanos(this, parkNanos)
}
- // process remaining events (that could have been added after last processNextEvent and before cancel
- if (hasPrivateEventLoop) {
- while (eventLoop!!.processNextEvent()) { /* just spin */ }
- }
+ // process queued events (that could have been added after last processNextEvent and before cancel
+ if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
// now return result
val state = this.state
(state as? CompletedExceptionally)?.let { throw it.exception }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 041aa9c..632978a 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -18,20 +18,29 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import java.util.concurrent.ConcurrentSkipListMap
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.LockSupport
import kotlin.coroutines.experimental.CoroutineContext
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
- * be asked to process next event from their event queue. It is used by [runBlocking] to
+ * be asked to process next event from their event queue.
+ *
+ * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
* continue processing events when invoked from the event dispatch thread.
*/
public interface EventLoop {
/**
- * Processes next event in this event loop and returns `true` or returns `false` if there are
- * no events to process or when invoked from the wrong thread.
+ * Processes next event in this event loop.
+ *
+ * The result of this function is to be interpreted like this:
+ * * `<= 0` -- there are potentially more events for immediate processing;
+ * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
+ * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
*/
- public fun processNextEvent(): Boolean
+ public fun processNextEvent(): Long
public companion object Factory {
/**
@@ -43,7 +52,7 @@
* ```
* while (needsToBeRunning) {
* if (Thread.interrupted()) break // or handle somehow
- * if (!eventLoop.processNextEvent()) LockSupport.park() // event loop will unpark
+ * LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
* }
* ```
*/
@@ -55,10 +64,12 @@
}
internal class EventLoopImpl(
- val thread: Thread
-) : CoroutineDispatcher(), EventLoop {
- val queue = LockFreeLinkedListHead()
- var parentJob: Job? = null
+ private val thread: Thread
+) : CoroutineDispatcher(), EventLoop, Delay {
+ private val queue = LockFreeLinkedListHead()
+ private val delayed = ConcurrentSkipListMap<DelayedTask, DelayedTask>()
+ private val nextSequence = AtomicLong()
+ private var parentJob: Job? = null
fun initParentJob(coroutine: Job) {
require(this.parentJob == null)
@@ -66,37 +77,127 @@
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Dispatch(block))
- }
-
- fun schedule(node: Node): Boolean {
- val added = if (parentJob == null) {
- queue.addLast(node)
- true
- } else
- queue.addLastIf(node) { !parentJob!!.isCompleted }
- if (added) {
- if (Thread.currentThread() !== thread)
- LockSupport.unpark(thread)
+ if (scheduleQueued(QueuedRunnableTask(block))) {
+ unpark()
} else {
- node.run()
+ block.run()
}
- return added
}
- override fun processNextEvent(): Boolean {
- if (Thread.currentThread() !== thread) return false
- (queue.removeFirstOrNull() as? Runnable)?.apply {
- run()
+ override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ if (scheduleDelayed(DelayedResumeTask(time, unit, continuation))) {
+ // todo: we should unpark only when this delayed task became first in the queue
+ unpark()
+ } else {
+ scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
+ }
+ }
+
+ override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
+ DelayedRunnableTask(time, unit, block).also { scheduleDelayed(it) }
+
+ override fun processNextEvent(): Long {
+ if (Thread.currentThread() !== thread) return Long.MAX_VALUE
+ // queue all delayed tasks that are due to be executed
+ while (true) {
+ val delayedTask = delayed.firstEntry()?.key ?: break
+ val now = System.nanoTime()
+ if (delayedTask.nanoTime - now > 0) break
+ if (!scheduleQueued(delayedTask)) break
+ delayed.remove(delayedTask)
+ }
+ // then process one event from queue
+ (queue.removeFirstOrNull() as? QueuedTask)?.let { queuedTask ->
+ queuedTask()
+ }
+ if (!queue.isEmpty) return 0
+ val nextDelayedTask = delayed.firstEntry()?.key ?: return Long.MAX_VALUE
+ return nextDelayedTask.nanoTime - System.nanoTime()
+ }
+
+ fun shutdown() {
+ // complete processing of all queued tasks
+ while (true) {
+ val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
+ queuedTask()
+ }
+ // cancel all delayed tasks
+ while (true) {
+ val delayedTask = delayed.pollFirstEntry()?.key ?: break
+ delayedTask.cancel()
+ }
+ }
+
+ override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
+
+ private fun scheduleQueued(queuedTask: QueuedTask): Boolean {
+ if (parentJob == null) {
+ queue.addLast(queuedTask)
return true
}
+ return queue.addLastIf(queuedTask, { !parentJob!!.isCompleted })
+ }
+
+ private fun scheduleDelayed(delayedTask: DelayedTask): Boolean {
+ delayed.put(delayedTask, delayedTask)
+ if (parentJob?.isActive != false) return true
+ delayedTask.dispose()
return false
}
- abstract class Node : LockFreeLinkedListNode(), Runnable
+ private fun unpark() {
+ if (Thread.currentThread() !== thread)
+ LockSupport.unpark(thread)
+ }
- class Dispatch(block: Runnable) : Node(), Runnable by block
+ private abstract class QueuedTask : LockFreeLinkedListNode(), () -> Unit
- override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
+ private class QueuedRunnableTask(
+ private val block: Runnable
+ ) : QueuedTask() {
+ override fun invoke() { block.run() }
+ }
+
+ private abstract inner class DelayedTask(
+ time: Long, timeUnit: TimeUnit
+ ) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle {
+ @JvmField val nanoTime: Long = System.nanoTime() + timeUnit.toNanos(time)
+ @JvmField val sequence: Long = nextSequence.getAndIncrement()
+
+ override fun compareTo(other: DelayedTask): Int {
+ val dTime = nanoTime - other.nanoTime
+ if (dTime > 0) return 1
+ if (dTime < 0) return -1
+ val dSequence = sequence - other.sequence
+ return if (dSequence > 0) 1 else if (dSequence < 0) -1 else 0
+ }
+
+ override final fun dispose() {
+ delayed.remove(this)
+ cancel()
+ }
+
+ open fun cancel() {}
+ }
+
+ private inner class DelayedResumeTask(
+ time: Long, timeUnit: TimeUnit,
+ private val cont: CancellableContinuation<Unit>
+ ) : DelayedTask(time, timeUnit) {
+ override fun invoke() {
+ with(cont) { resumeUndispatched(Unit) }
+ }
+ override fun cancel() {
+ if (!cont.isActive) return
+ val remaining = nanoTime - System.nanoTime()
+ scheduledExecutor.schedule(ResumeRunnable(cont), remaining, TimeUnit.NANOSECONDS)
+ }
+ }
+
+ private inner class DelayedRunnableTask(
+ time: Long, timeUnit: TimeUnit,
+ private val block: Runnable
+ ) : DelayedTask(time, timeUnit) {
+ override fun invoke() { block.run() }
+ }
}
-
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt
new file mode 100644
index 0000000..2d3af81
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.Test
+
+class RunBlockingTest {
+ /**
+ * Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
+ */
+ @Test(expected = CancellationException::class)
+ fun testYieldBlockingWithTimeout() = runBlocking {
+ withTimeout(100) {
+ while (true) {
+ yield()
+ }
+ }
+ }
+}
\ No newline at end of file