MPP: Delay, EventLoop & runBlocking moved to common code
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonBuilders.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonBuilders.kt
index 591829c..e01035b 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonBuilders.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonBuilders.kt
@@ -1,6 +1,7 @@
package kotlinx.coroutines.experimental
import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.EmptyCoroutineContext
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
public expect fun launch(
@@ -9,3 +10,8 @@
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
): Job
+
+@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
+public expect fun <T> runBlocking(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T): T
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonDelay.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonDelay.kt
new file mode 100644
index 0000000..f0d7e3f
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonDelay.kt
@@ -0,0 +1,5 @@
+package kotlinx.coroutines.experimental
+
+public expect interface Delay
+
+public expect suspend fun delay(time: Long)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonEventLoop.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonEventLoop.kt
new file mode 100644
index 0000000..600e651
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonEventLoop.kt
@@ -0,0 +1,3 @@
+package kotlinx.coroutines.experimental
+
+public expect interface EventLoop
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt
index bf118b4..172d4fb 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt
@@ -43,7 +43,7 @@
public expect fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle
public expect suspend fun Job.cancelAndJoin()
-@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
+@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER", "EXTENSION_SHADOWED_BY_MEMBER") // See KT-21598
public expect fun Job.cancelChildren(cause: Throwable? = null)
public expect suspend fun Job.joinChildren()
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index a280c50..e258870 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -164,7 +164,7 @@
* @param block the coroutine code.
*/
@Throws(InterruptedException::class)
-public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
+public actual fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index 3ae168f..b2fb042 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -30,7 +30,7 @@
* Implementation of this interface affects operation of
* [delay][kotlinx.coroutines.experimental.delay] and [withTimeout] functions.
*/
-public interface Delay {
+public actual interface Delay {
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
* This suspending function is cancellable.
@@ -81,8 +81,32 @@
*
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
+ *
+ * @param time time in milliseconds.
*/
-suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+public actual suspend fun delay(time: Long) {
+ kotlin.require(time >= 0) { "Delay time $time cannot be negative" }
+ if (time <= 0) return // don't delay
+ return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
+ cont.context.delay.scheduleResumeAfterDelay(time, TimeUnit.MILLISECONDS, cont)
+ }
+}
+
+/**
+ * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * Note, that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
+ * This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
+ *
+ * @param time time in the specified [unit].
+ * @param unit time unit.
+ */
+public suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
require(time >= 0) { "Delay time $time cannot be negative" }
if (time <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 6c8d930..782861f 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -31,7 +31,7 @@
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
* continue processing events when invoked from the event dispatch thread.
*/
-public interface EventLoop {
+public actual interface EventLoop {
/**
* Processes next event in this event loop.
*
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 15ceccd..ecd1b5b 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -267,6 +267,9 @@
@Deprecated(message = "For binary compatibility", level = DeprecationLevel.HIDDEN)
public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
+ /**
+ * @suppress **Deprecated**: Use with named `onCancelling` and `handler` parameters.
+ */
@Deprecated(message = "Use with named `onCancelling` and `handler` parameters", level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.invokeOnCompletion(onCancelling = onCancelling_, handler = handler)"))
public fun invokeOnCompletion(onCancelling_: Boolean = false, handler: CompletionHandler): DisposableHandle
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index 4d6fdb0..51aa9cf 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -67,7 +67,7 @@
/** Always returns [NonDisposableHandle]. */
@Suppress("OverridingDeprecatedMember")
- override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle =
+ override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
NonDisposableHandle
/** Always returns [NonDisposableHandle]. */
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 9e56696..b50f47b 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -2,6 +2,8 @@
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.EmptyCoroutineContext
+import kotlin.coroutines.experimental.startCoroutine
/**
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
@@ -47,6 +49,27 @@
return coroutine
}
+/**
+ * Runs new coroutine with the private event loop until its completion.
+ * This function should not be used from coroutine. It is designed to bridge regular code
+ * to libraries that are written in suspending style, to be used in `main` functions and in tests.
+ *
+ * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
+ * in this blocked thread until the completion of this coroutine.
+ * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
+ *
+ * @param context context of the coroutine. The default value is an implementation of [EventLoop].
+ * @param block the coroutine code.
+ */
+public actual fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
+ val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop() else null
+ val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
+ val coroutine = BlockingCoroutine<T>(newContext, privateEventLoop = eventLoop != null)
+ coroutine.initParentJob(newContext[Job])
+ block.startCoroutine(coroutine, coroutine)
+ return coroutine.joinBlocking()
+}
+
// --------------- implementation ---------------
private open class StandaloneCoroutine(
@@ -68,3 +91,36 @@
}
}
+private class BlockingCoroutine<T>(
+ parentContext: CoroutineContext,
+ private val privateEventLoop: Boolean
+) : AbstractCoroutine<T>(parentContext, true) {
+ private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
+
+ init {
+ if (privateEventLoop) require(eventLoop is BlockingEventLoop)
+ }
+
+ fun joinBlocking(): T {
+ while (true) {
+ val delay = eventLoop?.processNextEvent() ?: Double.MAX_VALUE
+ if (isCompleted) break
+ if (delay > 0) throw IllegalStateException("JS thread cannot be blocked, " +
+ "runBlocking { ... } cannot be waiting for its completion with timeout")
+ }
+ // process queued events (that could have been added after last processNextEvent and before cancel
+ if (privateEventLoop) (eventLoop as BlockingEventLoop).apply {
+ // We exit the "while" loop above when this coroutine's state "isCompleted",
+ // Here we should signal that BlockingEventLoop should not accept any more tasks
+ isCompleted = true
+ shutdown()
+ }
+ // now return result
+ val state = this.state
+ (state as? CompletedExceptionally)?.let { throw it.exception }
+ @Suppress("UNCHECKED_CAST")
+ return state as T
+ }
+}
+
+
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
index e27f2cb..8d3a788 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
@@ -26,12 +26,38 @@
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*/
@Suppress("PropertyName")
-public actual val DefaultDispatcher: CoroutineDispatcher = DefaultDispatcherImpl
+public actual val DefaultDispatcher: CoroutineDispatcher = DefaultExecutor
-private object DefaultDispatcherImpl : CoroutineDispatcher() {
+internal object DefaultExecutor : CoroutineDispatcher(), Delay {
+ fun enqueue(block: Runnable) {
+ window.setTimeout({ block.run() }, 0)
+ }
+
+ fun schedule(time: Double, block: Runnable): Int =
+ window.setTimeout({ block.run() }, time.timeToInt())
+
+ fun removeScheduled(handle: Int) {
+ window.clearTimeout(handle)
+ }
+
override fun dispatch(context: CoroutineContext, block: Runnable) {
window.setTimeout({ block.run() }, 0)
}
+
+ override fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>) {
+ window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.timeToInt())
+ }
+
+ override fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle {
+ val handle = window.setTimeout({ block.run() }, time.timeToInt())
+ return object : DisposableHandle {
+ override fun dispose() {
+ window.clearTimeout(handle)
+ }
+ }
+ }
+
+ private fun Double.timeToInt(): Int = coerceIn(0.0..Int.MAX_VALUE.toDouble()).toInt()
}
/**
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
new file mode 100644
index 0000000..73d2bb4
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.experimental.ContinuationInterceptor
+import kotlin.coroutines.experimental.CoroutineContext
+
+/**
+ * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
+ * scheduled execution of tasks.
+ *
+ * Implementation of this interface affects operation of
+ * [delay][kotlinx.coroutines.experimental.delay] and [withTimeout] functions.
+ */
+public actual interface Delay {
+ /**
+ * Schedules resume of a specified [continuation] after a specified delay [time].
+ *
+ * Continuation **must be scheduled** to resume even if it is already cancelled, because a cancellation is just
+ * an exception that the coroutine that used `delay` might wanted to catch and process. It might
+ * need to close some resources in its `finally` blocks, for example.
+ *
+ * This implementation is supposed to use dispatcher's native ability for scheduled execution in its thread(s).
+ * In order to avoid an extra delay of execution, the following code shall be used to resume this
+ * [continuation] when the code is already executing in the appropriate dispatcher:
+ *
+ * ```kotlin
+ * with(continuation) { resumeUndispatched(Unit) }
+ * ```
+ */
+ fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>)
+
+ /**
+ * Schedules invocation of a specified [block] after a specified delay [time].
+ * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
+ * request if it is not needed anymore.
+ */
+ fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle
+}
+
+/**
+ * Delays coroutine for a given time without blocking and resumes it after a specified time.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException].
+ *
+ * This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it resumes using a built-in scheduler.
+ *
+ * @param time time in milliseconds.
+ */
+public actual suspend fun delay(time: Long) {
+ val dt = time.toDouble()
+ kotlin.require(dt >= 0) { "Delay time $time cannot be negative" }
+ if (dt <= 0) return // don't delay
+ return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
+ cont.context.delay.scheduleResumeAfterDelay(dt, cont)
+ }
+}
+
+/** Returns [Delay] implementation of the given context */
+internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultExecutor
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
new file mode 100644
index 0000000..0b82edf
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.internal.*
+import kotlin.coroutines.experimental.CoroutineContext
+
+/**
+ * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
+ * be asked to process next event from their event queue.
+ *
+ * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
+ * continue processing events when invoked.
+ */
+public actual interface EventLoop {
+ /**
+ * Processes next event in this event loop.
+ *
+ * The result of this function is to be interpreted like this:
+ * * `<= 0` -- there are potentially more events for immediate processing;
+ * * `> 0` -- a number of milliseconds to wait for next scheduled event;
+ * * [Double.MAX_VALUE] or infinity -- no more events.
+ */
+ public fun processNextEvent(): Double
+}
+
+private const val DELAYED = 0
+private const val REMOVED = 1
+private const val RESCHEDULED = 2
+
+internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
+ private val queue = LinkedListHead()
+ private val delayed = Heap<DelayedTask>()
+
+ protected abstract val isCompleted: Boolean
+
+ private val nextTime: Double
+ get() {
+ if (!queue.isEmpty) return 0.0
+ val nextDelayedTask = delayed.peek() ?: return Double.MAX_VALUE
+ return (nextDelayedTask.time - now()).coerceAtLeast(0.0)
+ }
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) =
+ enqueue(block.toQueuedTask())
+
+ override fun scheduleResumeAfterDelay(time: Double, continuation: CancellableContinuation<Unit>) =
+ schedule(DelayedResumeTask(time, continuation))
+
+ override fun invokeOnTimeout(time: Double, block: Runnable): DisposableHandle =
+ DelayedRunnableTask(time, block).also { schedule(it) }
+
+ override fun processNextEvent(): Double {
+ // queue all delayed tasks that are due to be executed
+ if (!delayed.isEmpty) {
+ val now = now()
+ while (true) {
+ delayed.removeFirstIf {
+ if (it.timeToExecute(now)) {
+ queue.addLast(it)
+ true // proceed with remove
+ } else
+ false
+ } ?: break // quit loop when nothing more to remove
+ }
+ }
+ // then process one event from queue
+ (queue.removeFirstOrNull() as? QueuedTask)?.run()
+ return nextTime
+ }
+
+ private fun Runnable.toQueuedTask(): QueuedTask =
+ if (this is QueuedTask && isFresh) this else QueuedRunnableTask(this)
+
+ private fun enqueue(queuedTask: QueuedTask) {
+ if (!enqueueImpl(queuedTask))
+ DefaultExecutor.enqueue(queuedTask)
+ }
+
+ private fun enqueueImpl(queuedTask: QueuedTask): Boolean {
+ if (isCompleted) return false
+ queue.addLast(queuedTask)
+ return true
+ }
+
+ private fun schedule(delayedTask: DelayedTask) {
+ if (!scheduleImpl(delayedTask)) {
+ val remaining = delayedTask.time - now()
+ DefaultExecutor.schedule(remaining, delayedTask)
+ }
+ }
+
+ private fun scheduleImpl(delayedTask: DelayedTask): Boolean {
+ if (isCompleted) return false
+ delayed.addLast(delayedTask)
+ return true
+ }
+
+ protected fun rescheduleAllDelayed() {
+ while (true) {
+ val delayedTask = delayed.removeFirstOrNull() ?: break
+ delayedTask.rescheduleOnShutdown()
+ }
+ }
+
+ internal abstract class QueuedTask : LinkedListNode(), Runnable
+
+ private class QueuedRunnableTask(
+ private val block: Runnable
+ ) : QueuedTask() {
+ override fun run() { block.run() }
+ override fun toString(): String = block.toString()
+ }
+
+ internal abstract inner class DelayedTask(
+ delay: Double
+ ) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle, HeapNode {
+ override var index: Int = -1
+ var state = DELAYED
+ var handle = 0 // when state == RESCHEDULED
+ val time: Double = now() + delay
+
+ override fun compareTo(other: DelayedTask): Int {
+ val dTime = time - other.time
+ return when {
+ dTime > 0 -> 1
+ dTime < 0 -> -1
+ else -> 0
+ }
+ }
+
+ fun timeToExecute(now: Double): Boolean = now - time >= 0L
+
+ fun rescheduleOnShutdown() {
+ if (state != DELAYED) return
+ if (delayed.remove(this)) {
+ state = RESCHEDULED
+ handle = DefaultExecutor.schedule(time,this)
+ } else
+ state = REMOVED
+ }
+
+ override final fun dispose() {
+ when (state) {
+ DELAYED -> delayed.remove(this)
+ RESCHEDULED -> DefaultExecutor.removeScheduled(handle)
+ else -> return
+ }
+ state = REMOVED
+ }
+
+ override fun toString(): String = "Delayed[nanos=$time]"
+ }
+
+ private inner class DelayedResumeTask(
+ time: Double,
+ private val cont: CancellableContinuation<Unit>
+ ) : DelayedTask(time) {
+ override fun run() {
+ with(cont) { resumeUndispatched(Unit) }
+ }
+ }
+
+ private inner class DelayedRunnableTask(
+ time: Double,
+ private val block: Runnable
+ ) : DelayedTask(time) {
+ override fun run() { block.run() }
+ override fun toString(): String = super.toString() + block.toString()
+ }
+}
+
+internal class BlockingEventLoop() : EventLoopBase() {
+ public override var isCompleted: Boolean = false
+
+ fun shutdown() {
+ check(isCompleted)
+ // complete processing of all queued tasks
+ while (processNextEvent() <= 0) { /* spin */ }
+ // reschedule the rest of delayed tasks
+ rescheduleAllDelayed()
+ }
+}
+
+private fun now(): Double = js("Date.now()") as Double
\ No newline at end of file
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Heap.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Heap.kt
new file mode 100644
index 0000000..33704c8
--- /dev/null
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Heap.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface HeapNode {
+ public var index: Int
+}
+
+/**
+ * Binary heap.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public class Heap<T> where T: HeapNode, T: Comparable<T> {
+ private var a: Array<T?>? = null
+ private var size = 0
+
+ public val isEmpty: Boolean get() = size == 0
+
+ public fun peek(): T? = a?.get(0)
+
+ public fun removeFirstOrNull(): T? =
+ if (size > 0) {
+ removeAtImpl(0)
+ } else
+ null
+
+ public fun addLast(node: T) {
+ val a1 = realloc()
+ var i = size++
+ a1[i] = node
+ node.index = i
+ while (i > 0) {
+ val j = (i - 1) / 2
+ if (a1[j]!! <= a1[i]!!) break
+ swap(i, j)
+ i = j
+ }
+ }
+
+ public fun remove(node: T): Boolean =
+ if (node.index < 0) {
+ false
+ } else {
+ removeAtImpl(node.index)
+ true
+ }
+
+ @PublishedApi
+ internal fun removeAtImpl(index: Int): T {
+ check(size > 0)
+ val a = this.a!!
+ size--
+ if (index < size) {
+ swap(index, size)
+ var i = index
+ while (true) {
+ var j = 2 * i + 1
+ if (j >= size) break
+ if (j + 1 < size && a[j + 1]!! < a[j]!!) j++
+ if (a[i]!! <= a[j]!!) break
+ swap(i, j)
+ i = j
+ }
+ }
+ val result = a[size]!!
+ result.index = -1
+ a[size] = null
+ return result
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun realloc(): Array<T?> {
+ val a = this.a
+ return when {
+ a == null -> (arrayOfNulls<HeapNode>(4) as Array<T?>).also { this.a = it }
+ size >= a.size -> a.copyOf(size * 2).also { this.a = it }
+ else -> a
+ }
+ }
+
+ private fun swap(i: Int, j: Int) {
+ val a = a!!
+ val ni = a[j]!!
+ val nj = a[i]!!
+ a[i] = ni
+ a[j] = nj
+ ni.index = i
+ nj.index = j
+ }
+
+ public inline fun removeFirstIf(predicate: (T) -> Boolean): T? {
+ val first = peek() ?: return null
+ return if (predicate(first)) {
+ removeAtImpl(0)
+ } else
+ null
+ }
+}
\ No newline at end of file
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
index a45cade..aaae216 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt
@@ -9,6 +9,7 @@
private set
public var isRemoved: Boolean = false
private set
+ public val isFresh: Boolean = next === this
public fun addLast(node: Node) {
val prev = this.prev
@@ -18,16 +19,20 @@
this.prev = node
}
- public open fun remove() {
+ public open fun remove(): Boolean {
+ if (isRemoved) return false
val prev = this.prev
val next = this.next
prev.next = next
next.prev = prev
isRemoved = true
+ return true
}
}
public open class LinkedListHead : LinkedListNode() {
+ public val isEmpty get() = next === this
+
/**
* Iterates over all elements in this list of a specified type.
*/
@@ -41,5 +46,12 @@
// just a defensive programming -- makes sure that list head sentinel is never removed
public final override fun remove() = throw UnsupportedOperationException()
+
+ fun removeFirstOrNull(): Node? {
+ val node = next
+ if (node === this) return null
+ node.remove()
+ return node
+ }
}