blob: 632978a3a17d0b4e5fc2a54bdcf7bdac1043777a [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.ConcurrentSkipListMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.LockSupport
import kotlin.coroutines.experimental.CoroutineContext
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
* continue processing events when invoked from the event dispatch thread.
*/
public interface EventLoop {
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
*/
public fun processNextEvent(): Long
public companion object Factory {
/**
* Creates a new event loop that is bound the specified [thread] (current thread by default) and
* stops accepting new events when [parentJob] completes. Every continuation that is scheduled
* onto this event loop unparks the specified thread via [LockSupport.unpark].
*
* The main event-processing loop using the resulting `eventLoop` object should look like this:
* ```
* while (needsToBeRunning) {
* if (Thread.interrupted()) break // or handle somehow
* LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
* }
* ```
*/
public operator fun invoke(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
EventLoopImpl(thread).apply {
if (parentJob != null) initParentJob(parentJob)
}
}
}
internal class EventLoopImpl(
private val thread: Thread
) : CoroutineDispatcher(), EventLoop, Delay {
private val queue = LockFreeLinkedListHead()
private val delayed = ConcurrentSkipListMap<DelayedTask, DelayedTask>()
private val nextSequence = AtomicLong()
private var parentJob: Job? = null
fun initParentJob(coroutine: Job) {
require(this.parentJob == null)
this.parentJob = coroutine
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (scheduleQueued(QueuedRunnableTask(block))) {
unpark()
} else {
block.run()
}
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
if (scheduleDelayed(DelayedResumeTask(time, unit, continuation))) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
}
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
DelayedRunnableTask(time, unit, block).also { scheduleDelayed(it) }
override fun processNextEvent(): Long {
if (Thread.currentThread() !== thread) return Long.MAX_VALUE
// queue all delayed tasks that are due to be executed
while (true) {
val delayedTask = delayed.firstEntry()?.key ?: break
val now = System.nanoTime()
if (delayedTask.nanoTime - now > 0) break
if (!scheduleQueued(delayedTask)) break
delayed.remove(delayedTask)
}
// then process one event from queue
(queue.removeFirstOrNull() as? QueuedTask)?.let { queuedTask ->
queuedTask()
}
if (!queue.isEmpty) return 0
val nextDelayedTask = delayed.firstEntry()?.key ?: return Long.MAX_VALUE
return nextDelayedTask.nanoTime - System.nanoTime()
}
fun shutdown() {
// complete processing of all queued tasks
while (true) {
val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
queuedTask()
}
// cancel all delayed tasks
while (true) {
val delayedTask = delayed.pollFirstEntry()?.key ?: break
delayedTask.cancel()
}
}
override fun toString(): String = "EventLoopImpl@${Integer.toHexString(System.identityHashCode(this))}"
private fun scheduleQueued(queuedTask: QueuedTask): Boolean {
if (parentJob == null) {
queue.addLast(queuedTask)
return true
}
return queue.addLastIf(queuedTask, { !parentJob!!.isCompleted })
}
private fun scheduleDelayed(delayedTask: DelayedTask): Boolean {
delayed.put(delayedTask, delayedTask)
if (parentJob?.isActive != false) return true
delayedTask.dispose()
return false
}
private fun unpark() {
if (Thread.currentThread() !== thread)
LockSupport.unpark(thread)
}
private abstract class QueuedTask : LockFreeLinkedListNode(), () -> Unit
private class QueuedRunnableTask(
private val block: Runnable
) : QueuedTask() {
override fun invoke() { block.run() }
}
private abstract inner class DelayedTask(
time: Long, timeUnit: TimeUnit
) : QueuedTask(), Comparable<DelayedTask>, DisposableHandle {
@JvmField val nanoTime: Long = System.nanoTime() + timeUnit.toNanos(time)
@JvmField val sequence: Long = nextSequence.getAndIncrement()
override fun compareTo(other: DelayedTask): Int {
val dTime = nanoTime - other.nanoTime
if (dTime > 0) return 1
if (dTime < 0) return -1
val dSequence = sequence - other.sequence
return if (dSequence > 0) 1 else if (dSequence < 0) -1 else 0
}
override final fun dispose() {
delayed.remove(this)
cancel()
}
open fun cancel() {}
}
private inner class DelayedResumeTask(
time: Long, timeUnit: TimeUnit,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(time, timeUnit) {
override fun invoke() {
with(cont) { resumeUndispatched(Unit) }
}
override fun cancel() {
if (!cont.isActive) return
val remaining = nanoTime - System.nanoTime()
scheduledExecutor.schedule(ResumeRunnable(cont), remaining, TimeUnit.NANOSECONDS)
}
}
private inner class DelayedRunnableTask(
time: Long, timeUnit: TimeUnit,
private val block: Runnable
) : DelayedTask(time, timeUnit) {
override fun invoke() { block.run() }
}
}