JS: Introduce Window.asCoroutineDispatcher and use custom queue in
Window.awaitAnimationFrame to align all animations
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
index 17583fc..d34aa08 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineContext.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental
+import kotlin.browser.*
import kotlin.coroutines.experimental.*
/**
@@ -40,7 +41,11 @@
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*/
@Suppress("PropertyName")
-public actual val DefaultDispatcher: CoroutineDispatcher = JSDispatcher
+public actual val DefaultDispatcher: CoroutineDispatcher = when {
+ // Check if we are in the browser and must use window.postMessage to avoid setTimeout throttling
+ jsTypeOf(window) != "undefined" -> window.asCoroutineDispatcher()
+ else -> NodeDispatcher()
+}
/**
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index 05937a4..10b7c18 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -72,4 +72,5 @@
}
/** Returns [Delay] implementation of the given context */
-internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: JSDispatcher
+internal val CoroutineContext.delay: Delay get() =
+ get(ContinuationInterceptor) as? Delay ?: (DefaultDispatcher as Delay)
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
index d74f83f..53cadc1 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
@@ -16,20 +16,12 @@
package kotlinx.coroutines.experimental
-import kotlin.browser.*
import kotlin.coroutines.experimental.*
+import org.w3c.dom.*
-internal object JSDispatcher : CoroutineDispatcher(), Delay {
- // Check if we are in the browser and must use postMessage to avoid setTimeout throttling
- private val messageQueue =
- if (jsTypeOf(window) != "undefined") MessageQueue().apply { register() } else null
-
+internal class NodeDispatcher : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
- if (messageQueue != null) {
- messageQueue.enqueue(block)
- } else {
- setTimeout({ block.run() }, 0)
- }
+ setTimeout({ block.run() }, 0)
}
override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) {
@@ -46,60 +38,105 @@
}
}
-// it is open for tests
-internal open class MessageQueue {
- val yieldEvery = 16 // yield to JS event loop after this many processed messages
+internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
+ private val messageName = "dispatchCoroutine"
- private val messageName = "JSDispatcher.dispatch"
- private var scheduled = false
+ private val queue = object : MessageQueue() {
+ override fun schedule() {
+ window.postMessage(messageName, "*")
+ }
+ }
- private var queue = arrayOfNulls<Runnable>(8)
- private var head = 0
- private var tail = 0
-
- fun register() {
+ init {
window.addEventListener("message", { event: dynamic ->
if (event.source == window && event.data == messageName) {
event.stopPropagation()
- process()
+ queue.process()
}
}, true)
}
- // it is open for tests
- open fun schedule() {
- window.postMessage(messageName, "*")
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ queue.enqueue(block)
}
- val isEmpty get() = head == tail
-
- fun poll(): Runnable? {
- if (isEmpty) return null
- val result = queue[head]!!
- queue[head] = null
- head = head.next()
- return result
+ override fun scheduleResumeAfterDelay(time: Int, continuation: CancellableContinuation<Unit>) {
+ window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.coerceAtLeast(0))
}
- tailrec fun enqueue(block: Runnable) {
- val newTail = tail.next()
- if (newTail == head) {
- resize()
- enqueue(block) // retry with larger size
- return
+ override fun invokeOnTimeout(time: Int, block: Runnable): DisposableHandle {
+ val handle = window.setTimeout({ block.run() }, time.coerceAtLeast(0))
+ return object : DisposableHandle {
+ override fun dispose() {
+ window.clearTimeout(handle)
+ }
}
- queue[tail] = block
- tail = newTail
+ }
+}
+
+internal abstract class MessageQueue : Queue<Runnable>() {
+ val yieldEvery = 16 // yield to JS event loop after this many processed messages
+
+ private var scheduled = false
+
+ abstract fun schedule()
+
+ fun enqueue(element: Runnable) {
+ add(element)
if (!scheduled) {
scheduled = true
schedule()
}
}
- fun resize() {
+ fun process() {
+ try {
+ // limit number of processed messages
+ repeat(yieldEvery) {
+ val element = poll() ?: return@process
+ element.run()
+ }
+ } finally {
+ if (isEmpty) {
+ scheduled = false
+ } else {
+ schedule()
+ }
+ }
+ }
+}
+
+internal open class Queue<T : Any> {
+ private var queue = arrayOfNulls<Any?>(8)
+ private var head = 0
+ private var tail = 0
+
+ val isEmpty get() = head == tail
+
+ fun poll(): T? {
+ if (isEmpty) return null
+ val result = queue[head]!!
+ queue[head] = null
+ head = head.next()
+ @Suppress("UNCHECKED_CAST")
+ return result as T
+ }
+
+ tailrec fun add(element: T) {
+ val newTail = tail.next()
+ if (newTail == head) {
+ resize()
+ add(element) // retry with larger size
+ return
+ }
+ queue[tail] = element
+ tail = newTail
+ }
+
+ private fun resize() {
var i = head
var j = 0
- val a = arrayOfNulls<Runnable>(queue.size * 2)
+ val a = arrayOfNulls<Any?>(queue.size * 2)
while (i != tail) {
a[j++] = queue[i]
i = i.next()
@@ -113,22 +150,6 @@
val j = this + 1
return if (j == queue.size) 0 else j
}
-
- fun process() {
- try {
- // limit number of processed messages
- repeat(yieldEvery) {
- val block = poll() ?: return@process
- block.run()
- }
- } finally {
- if (isEmpty) {
- scheduled = false
- } else {
- schedule()
- }
- }
- }
}
// We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Window.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Window.kt
index ae990ce..de74844 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Window.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Window.kt
@@ -19,16 +19,56 @@
import org.w3c.dom.Window
/**
+ * Converts an instance of [Window] to an implementation of [CoroutineDispatcher].
+ */
+public fun Window.asCoroutineDispatcher(): CoroutineDispatcher =
+ @Suppress("UnsafeCastFromDynamic")
+ asDynamic().coroutineDispatcher ?: WindowDispatcher(this).also {
+ asDynamic().coroutineDispatcher = it
+ }
+
+/**
* Suspends coroutine until next JS animation frame and returns frame time on resumption.
* The time is consistent with [window.performance.now()][org.w3c.performance.Performance.now].
* This function is cancellable. If the [Job] of the current coroutine is completed while this suspending
* function is waiting, this function immediately resumes with [CancellationException].
*/
public suspend fun Window.awaitAnimationFrame(): Double = suspendCancellableCoroutine { cont ->
- val handle = requestAnimationFrame { timestamp ->
- with(cont) { DefaultDispatcher.resumeUndispatched(timestamp) }
- }
- cont.invokeOnCompletion {
- cancelAnimationFrame(handle)
- }
+ asWindowAnimationQueue().enqueue(cont)
}
+
+private fun Window.asWindowAnimationQueue(): WindowAnimationQueue =
+ @Suppress("UnsafeCastFromDynamic")
+ asDynamic().coroutineAnimationQueue ?: WindowAnimationQueue(this).also {
+ asDynamic().coroutineAnimationQueue = it
+ }
+
+private class WindowAnimationQueue(private val window: Window) {
+ private val dispatcher = window.asCoroutineDispatcher()
+ private var scheduled = false
+ private var current = Queue<CancellableContinuation<Double>>()
+ private var next = Queue<CancellableContinuation<Double>>()
+ private var timestamp = 0.0
+
+ fun enqueue(cont: CancellableContinuation<Double>) {
+ next.add(cont)
+ if (!scheduled) {
+ scheduled = true
+ window.requestAnimationFrame { ts ->
+ timestamp = ts
+ val prev = current
+ current = next
+ next = prev
+ scheduled = false
+ process()
+ }
+ }
+ }
+
+ fun process() {
+ while(true) {
+ val element = current.poll() ?: return
+ with(element) { dispatcher.resumeUndispatched(timestamp) }
+ }
+ }
+}
\ No newline at end of file