blob: 430b361fc4d08bec79c37d14843a03a812d48492 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.cinterop.*
import kotlinx.coroutines.internal.*
import platform.posix.*
import kotlin.coroutines.*
import kotlin.system.*
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
* continue processing events when invoked from the event dispatch thread.
*/
internal interface EventLoop {
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
*/
public fun processNextEvent(): Long
}
/**
* Creates a new event loop.
*/
@Suppress("FunctionName")
internal fun EventLoop(parentJob: Job? = null): CoroutineDispatcher =
EventLoopImpl().apply {
if (parentJob != null) initParentJob(parentJob)
}
private const val DELAYED = 0
private const val REMOVED = 1
private const val RESCHEDULED = 2
private const val MS_TO_NS = 1_000_000L
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
private fun delayToNanos(timeMillis: Long): Long = when {
timeMillis <= 0 -> 0L
timeMillis >= MAX_MS -> Long.MAX_VALUE
else -> timeMillis * MS_TO_NS
}
@Suppress("PrivatePropertyName")
private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
private typealias Queue<T> = LockFreeMPSCQueueCore<T>
internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only once
private val _delayed = atomic<ThreadSafeHeap<DelayedTask>?>(null)
protected abstract val isCompleted: Boolean
protected val isEmpty: Boolean
get() = isQueueEmpty && isDelayedEmpty
private val isQueueEmpty: Boolean get() {
val queue = _queue.value
return when (queue) {
null -> true
is Queue<*> -> queue.isEmpty
else -> queue === CLOSED_EMPTY
}
}
private val isDelayedEmpty: Boolean get() {
val delayed = _delayed.value
return delayed == null || delayed.isEmpty
}
private val nextTime: Long
get() {
if (!isQueueEmpty) return 0
val delayed = _delayed.value ?: return Long.MAX_VALUE
val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
}
override fun dispatch(context: CoroutineContext, block: Runnable) =
execute(block)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
schedule(DelayedResumeTask(timeMillis, continuation))
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
DelayedRunnableTask(timeMillis, block).also { schedule(it) }
override fun processNextEvent(): Long {
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
dequeue()?.run()
return nextTime
}
@Suppress("MemberVisibilityCanBePrivate") // todo: remove suppress when KT-22030 is fixed
internal fun execute(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else
DefaultExecutor.execute(task)
}
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
}
}
}
}
protected fun closeQueue() {
assert(isCompleted)
_queue.loop { queue ->
when (queue) {
null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
is Queue<*> -> {
queue.close()
return
}
else -> when {
queue === CLOSED_EMPTY -> return
else -> {
// update to full-blown queue to close
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
newQueue.addLast(queue as Runnable)
if (_queue.compareAndSet(queue, newQueue)) return
}
}
}
}
}
internal fun schedule(delayedTask: DelayedTask) {
if (scheduleImpl(delayedTask)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else
DefaultExecutor.schedule(delayedTask)
}
private fun scheduleImpl(delayedTask: DelayedTask): Boolean {
if (isCompleted) return false
val delayed = _delayed.value ?: run {
_delayed.compareAndSet(null, ThreadSafeHeap())
_delayed.value!!
}
return delayed.addLastIf(delayedTask) { !isCompleted }
}
internal fun removeDelayedImpl(delayedTask: DelayedTask) {
_delayed.value?.remove(delayedTask)
}
// It performs "hard" shutdown for test cleanup purposes
protected fun resetAll() {
_queue.value = null
_delayed.value = null
}
// This is a "soft" (normal) shutdown
protected fun rescheduleAllDelayed() {
while (true) {
val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
delayedTask.rescheduleOnShutdown()
}
}
fun shutdown() {
closeQueue()
// complete processing of all queued tasks
while (processNextEvent() <= 0) { /* spin */ }
// reschedule the rest of delayed tasks
rescheduleAllDelayed()
}
internal abstract inner class DelayedTask(
timeMillis: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
override var index: Int = -1
var state = DELAYED // Guarded by by lock on this task for reschedule/dispose purposes
val nanoTime: Long = nanoTime() + delayToNanos(timeMillis)
override fun compareTo(other: DelayedTask): Int {
val dTime = nanoTime - other.nanoTime
return when {
dTime > 0 -> 1
dTime < 0 -> -1
else -> 0
}
}
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
fun rescheduleOnShutdown() {
if (state != DELAYED) return
if (_delayed.value!!.remove(this)) {
state = RESCHEDULED
DefaultExecutor.schedule(this)
} else
state = REMOVED
}
final override fun dispose() {
when (state) {
DELAYED -> _delayed.value?.remove(this)
RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
else -> return
}
state = REMOVED
}
override fun toString(): String = "Delayed[nanos=$nanoTime]"
}
private inner class DelayedResumeTask(
timeMillis: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(timeMillis) {
override fun run() {
with(cont) { resumeUndispatched(Unit) }
}
}
private inner class DelayedRunnableTask(
timeMillis: Long,
private val block: Runnable
) : DelayedTask(timeMillis) {
override fun run() { block.run() }
override fun toString(): String = super.toString() + block.toString()
}
}
private class EventLoopImpl : EventLoopBase() {
private var parentJob: Job? = null
override val isCompleted: Boolean get() = parentJob?.isCompleted == true
fun initParentJob(parentJob: Job) {
require(this.parentJob == null)
this.parentJob = parentJob
}
}
internal class BlockingEventLoop : EventLoopBase() {
public override var isCompleted: Boolean = false
}
private fun nanoTime(): Long {
return getTimeNanos()
}
private fun unpark(): Unit { /* does nothing */ }