EventLoop implements Delay

* Predictable single-threaded scheduling in runBlocking
* withTimeout used from runBlocking will release memory (important for tests)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 4db38bb..4aed46f 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -81,11 +81,11 @@
 @Throws(InterruptedException::class)
 public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
     val currentThread = Thread.currentThread()
-    val privateEventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
-    val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
-    val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
+    val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
+    val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
+    val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
     coroutine.initParentJob(context[Job])
-    privateEventLoop?.initParentJob(coroutine)
+    eventLoop?.initParentJob(coroutine)
     block.startCoroutine(coroutine, coroutine)
     return coroutine.joinBlocking()
 }
@@ -104,7 +104,7 @@
 
 private class LazyStandaloneCoroutine(
     parentContext: CoroutineContext,
-    val block: suspend CoroutineScope.() -> Unit
+    private val block: suspend CoroutineScope.() -> Unit
 ) : StandaloneCoroutine(parentContext, active = false) {
     override fun onStart() {
         block.startCoroutine(this, this)
@@ -120,11 +120,15 @@
 
 private class BlockingCoroutine<T>(
     override val parentContext: CoroutineContext,
-    val blockedThread: Thread,
-    val hasPrivateEventLoop: Boolean
+    private val blockedThread: Thread,
+    private val privateEventLoop: Boolean
 ) : AbstractCoroutine<T>(active = true) {
     val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
 
+    init {
+        if (privateEventLoop) require(eventLoop is EventLoopImpl)
+    }
+
     override fun afterCompletion(state: Any?, mode: Int) {
         if (Thread.currentThread() != blockedThread)
             LockSupport.unpark(blockedThread)
@@ -132,15 +136,15 @@
 
     @Suppress("UNCHECKED_CAST")
     fun joinBlocking(): T {
-        while (isActive) {
+        while (true) {
             if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
-            if (eventLoop == null || !eventLoop.processNextEvent())
-                LockSupport.park(this)
+            val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
+            // note: process next even may look unpark flag, so check !isActive before parking
+            if (!isActive) break
+            LockSupport.parkNanos(this, parkNanos)
         }
-        // process remaining events (that could have been added after last processNextEvent and before cancel
-        if (hasPrivateEventLoop) {
-            while (eventLoop!!.processNextEvent()) { /* just spin */ }
-        }
+        // process queued events (that could have been added after last processNextEvent and before cancel
+        if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
         // now return result
         val state = this.state
         (state as? CompletedExceptionally)?.let { throw it.exception }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 041aa9c..632978a 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -18,20 +18,29 @@
 
 import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
 import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import java.util.concurrent.ConcurrentSkipListMap
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.locks.LockSupport
 import kotlin.coroutines.experimental.CoroutineContext
 
 /**
  * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
- * be asked to process next event from their event queue. It is used by [runBlocking] to
+ * be asked to process next event from their event queue.
+ *
+ * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
  * continue processing events when invoked from the event dispatch thread.
  */
 public interface EventLoop {
     /**
-     * Processes next event in this event loop and returns `true` or returns `false` if there are
-     * no events to process or when invoked from the wrong thread.
+     * Processes next event in this event loop.
+     *
+     * The result of this function is to be interpreted like this:
+     * * `<= 0` -- there are potentially more events for immediate processing;
+     * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
+     * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
      */
-    public fun processNextEvent(): Boolean
+    public fun processNextEvent(): Long
 
     public companion object Factory {
         /**
@@ -43,7 +52,7 @@
          * ```
          * while (needsToBeRunning) {
          *     if (Thread.interrupted()) break // or handle somehow
-         *     if (!eventLoop.processNextEvent()) LockSupport.park() // event loop will unpark
+         *     LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
          * }
          * ```
          */
@@ -55,10 +64,12 @@
 }
 
 internal class EventLoopImpl(
-    val thread: Thread
-) : CoroutineDispatcher(), EventLoop {
-    val queue = LockFreeLinkedListHead()
-    var parentJob: Job? = null
+    private val thread: Thread
+) : CoroutineDispatcher(), EventLoop, Delay {
+    private val queue = LockFreeLinkedListHead()
+    private val delayed = ConcurrentSkipListMap<DelayedTask, DelayedTask>()
+    private val nextSequence = AtomicLong()
+    private var parentJob: Job? = null
 
     fun initParentJob(coroutine: Job) {
         require(this.parentJob == null)
@@ -66,37 +77,127 @@
     }
 
     override fun dispatch(context: CoroutineContext, block: Runnable) {
-        schedule(Dispatch(block))
-    }
-
-    fun schedule(node: Node): Boolean {
-        val added = if (parentJob == null) {
-            queue.addLast(node)
-            true
-        } else
-            queue.addLastIf(node) { !parentJob!!.isCompleted }
-        if (added) {
-            if (Thread.currentThread() !== thread)
-                LockSupport.unpark(thread)
+        if (scheduleQueued(QueuedRunnableTask(block))) {
+            unpark()
         } else {
-            node.run()
+            block.run()
         }
-        return added
     }
 
-    override fun processNextEvent(): Boolean {
-        if (Thread.currentThread() !== thread) return false
-        (queue.removeFirstOrNull() as? Runnable)?.apply {
-            run()
+    override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+        if (scheduleDelayed(DelayedResumeTask(time, unit, continuation))) {
+            // todo: we should unpark only when this delayed task became first in the queue
+            unpark()
+        } else {
+            scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
+        }
+    }
+
+    override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
+        DelayedRunnableTask(time, unit, block).also { scheduleDelayed(it) }
+
+    override fun processNextEvent(): Long {
+        if (Thread.currentThread() !== thread) return Long.MAX_VALUE
+        // queue all delayed tasks that are due to be executed
+        while (true) {
+            val delayedTask = delayed.firstEntry()?.key ?: break
+            val now = System.nanoTime()
+            if (delayedTask.nanoTime - now > 0) break
+            if (!scheduleQueued(delayedTask)) break
+            delayed.remove(delayedTask)
+        }
+        // then process one event from queue
+        (queue.removeFirstOrNull() as? QueuedTask)?.let { queuedTask ->
+            queuedTask()
+        }
+        if (!queue.isEmpty) return 0
+        val nextDelayedTask = delayed.firstEntry()?.key ?: return Long.MAX_VALUE
+        return nextDelayedTask.nanoTime - System.nanoTime()
+    }
+
+    fun shutdown() {
+        // complete processing of all queued tasks
+        while (true) {
+            val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
+            queuedTask()
+        }
+        // cancel all delayed tasks
+        while (true) {
+            val delayedTask = delayed.pollFirstEntry()?.key ?: break
+            delayedTask.cancel()
+        }
+    }
+
+    override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
+
+    private fun scheduleQueued(queuedTask: QueuedTask): Boolean {
+        if (parentJob == null) {
+            queue.addLast(queuedTask)
             return true
         }
+        return queue.addLastIf(queuedTask, { !parentJob!!.isCompleted })
+    }
+
+    private fun scheduleDelayed(delayedTask: DelayedTask): Boolean {
+        delayed.put(delayedTask, delayedTask)
+        if (parentJob?.isActive != false) return true
+        delayedTask.dispose()
         return false
     }
 
-    abstract class Node : LockFreeLinkedListNode(), Runnable
+    private fun unpark() {
+        if (Thread.currentThread() !== thread)
+            LockSupport.unpark(thread)
+    }
 
-    class Dispatch(block: Runnable) : Node(), Runnable by block
+    private abstract class QueuedTask : LockFreeLinkedListNode(), () -> Unit
 
-    override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
+    private class QueuedRunnableTask(
+        private val block: Runnable
+    ) : QueuedTask() {
+        override fun invoke() { block.run() }
+    }
+
+    private abstract inner class DelayedTask(
+        time: Long, timeUnit: TimeUnit
+    ) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle {
+        @JvmField val nanoTime: Long = System.nanoTime() + timeUnit.toNanos(time)
+        @JvmField val sequence: Long = nextSequence.getAndIncrement()
+
+        override fun compareTo(other: DelayedTask): Int {
+            val dTime = nanoTime - other.nanoTime
+            if (dTime > 0) return 1
+            if (dTime < 0) return -1
+            val dSequence = sequence - other.sequence
+            return if (dSequence > 0) 1 else if (dSequence < 0) -1 else 0
+        }
+
+        override final fun dispose() {
+            delayed.remove(this)
+            cancel()
+        }
+
+        open fun cancel() {}
+    }
+
+    private inner class DelayedResumeTask(
+        time: Long, timeUnit: TimeUnit,
+        private val cont: CancellableContinuation<Unit>
+    ) : DelayedTask(time, timeUnit) {
+        override fun invoke() {
+            with(cont) { resumeUndispatched(Unit) }
+        }
+        override fun cancel() {
+            if (!cont.isActive) return
+            val remaining = nanoTime - System.nanoTime()
+            scheduledExecutor.schedule(ResumeRunnable(cont), remaining, TimeUnit.NANOSECONDS)
+        }
+    }
+
+    private inner class DelayedRunnableTask(
+        time: Long, timeUnit: TimeUnit,
+        private val block: Runnable
+    ) : DelayedTask(time, timeUnit) {
+        override fun invoke() { block.run() }
+    }
 }
-
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt
new file mode 100644
index 0000000..2d3af81
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunBlockingTest.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.Test
+
+class RunBlockingTest {
+    /**
+     * Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
+     */
+    @Test(expected = CancellationException::class)
+    fun testYieldBlockingWithTimeout() = runBlocking {
+        withTimeout(100) {
+            while (true) {
+                yield()
+            }
+        }
+    }
+}
\ No newline at end of file