Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | b4c2e14 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 2 | * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines |
| 6 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 7 | import kotlinx.atomicfu.* |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.internal.* |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 9 | import kotlin.coroutines.* |
| 10 | import kotlin.jvm.* |
Roman Elizarov | 60f8688 | 2019-12-17 19:14:52 +0300 | [diff] [blame] | 11 | import kotlin.native.concurrent.* |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 12 | |
| 13 | /** |
| 14 | * Extended by [CoroutineDispatcher] implementations that have event loop inside and can |
| 15 | * be asked to process next event from their event queue. |
| 16 | * |
| 17 | * It may optionally implement [Delay] interface and support time-scheduled tasks. |
| 18 | * It is created or pigged back onto (see [ThreadLocalEventLoop]) |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 19 | * by `runBlocking` and by [Dispatchers.Unconfined]. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 20 | * |
| 21 | * @suppress **This an internal API and should not be used from general code.** |
| 22 | */ |
| 23 | internal abstract class EventLoop : CoroutineDispatcher() { |
| 24 | /** |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 25 | * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 26 | */ |
| 27 | private var useCount = 0L |
| 28 | |
| 29 | /** |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 30 | * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 31 | * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely |
| 32 | * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. |
| 33 | */ |
| 34 | private var shared = false |
| 35 | |
| 36 | /** |
| 37 | * Queue used by [Dispatchers.Unconfined] tasks. |
| 38 | * These tasks are thread-local for performance and take precedence over the rest of the queue. |
| 39 | */ |
| 40 | private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null |
| 41 | |
| 42 | /** |
| 43 | * Processes next event in this event loop. |
| 44 | * |
| 45 | * The result of this function is to be interpreted like this: |
| 46 | * * `<= 0` -- there are potentially more events for immediate processing; |
| 47 | * * `> 0` -- a number of nanoseconds to wait for next scheduled event; |
| 48 | * * [Long.MAX_VALUE] -- no more events. |
| 49 | * |
| 50 | * **NOTE**: Must be invoked only from the event loop's thread |
| 51 | * (no check for performance reasons, may be added in the future). |
| 52 | */ |
| 53 | public open fun processNextEvent(): Long { |
| 54 | if (!processUnconfinedEvent()) return Long.MAX_VALUE |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 55 | return 0 |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty |
| 59 | |
| 60 | protected open val nextTime: Long |
| 61 | get() { |
| 62 | val queue = unconfinedQueue ?: return Long.MAX_VALUE |
| 63 | return if (queue.isEmpty) Long.MAX_VALUE else 0L |
| 64 | } |
| 65 | |
| 66 | public fun processUnconfinedEvent(): Boolean { |
| 67 | val queue = unconfinedQueue ?: return false |
| 68 | val task = queue.removeFirstOrNull() ?: return false |
| 69 | task.run() |
| 70 | return true |
| 71 | } |
| 72 | /** |
| 73 | * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context |
| 74 | * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). |
| 75 | * By default, event loop implementation is thread-local and should not processed in the context |
| 76 | * (current thread's event loop should be processed instead). |
| 77 | */ |
| 78 | public open fun shouldBeProcessedFromContext(): Boolean = false |
| 79 | |
| 80 | /** |
| 81 | * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] |
| 82 | * into the current event loop. |
| 83 | */ |
| 84 | public fun dispatchUnconfined(task: DispatchedTask<*>) { |
| 85 | val queue = unconfinedQueue ?: |
| 86 | ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } |
| 87 | queue.addLast(task) |
| 88 | } |
| 89 | |
| 90 | public val isActive: Boolean |
| 91 | get() = useCount > 0 |
| 92 | |
| 93 | public val isUnconfinedLoopActive: Boolean |
| 94 | get() = useCount >= delta(unconfined = true) |
| 95 | |
| 96 | // May only be used from the event loop's thread |
| 97 | public val isUnconfinedQueueEmpty: Boolean |
| 98 | get() = unconfinedQueue?.isEmpty ?: true |
| 99 | |
| 100 | private fun delta(unconfined: Boolean) = |
| 101 | if (unconfined) (1L shl 32) else 1L |
| 102 | |
| 103 | fun incrementUseCount(unconfined: Boolean = false) { |
| 104 | useCount += delta(unconfined) |
| 105 | if (!unconfined) shared = true |
| 106 | } |
| 107 | |
| 108 | fun decrementUseCount(unconfined: Boolean = false) { |
| 109 | useCount -= delta(unconfined) |
| 110 | if (useCount > 0) return |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 111 | assert { useCount == 0L } // "Extra decrementUseCount" |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 112 | if (shared) { |
| 113 | // shut it down and remove from ThreadLocalEventLoop |
| 114 | shutdown() |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | protected open fun shutdown() {} |
| 119 | } |
| 120 | |
Roman Elizarov | 12a0318 | 2019-12-19 11:09:09 +0300 | [diff] [blame] | 121 | @ThreadLocal |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 122 | internal object ThreadLocalEventLoop { |
| 123 | private val ref = CommonThreadLocal<EventLoop?>() |
| 124 | |
| 125 | internal val eventLoop: EventLoop |
| 126 | get() = ref.get() ?: createEventLoop().also { ref.set(it) } |
| 127 | |
| 128 | internal fun currentOrNull(): EventLoop? = |
| 129 | ref.get() |
| 130 | |
| 131 | internal fun resetEventLoop() { |
| 132 | ref.set(null) |
| 133 | } |
| 134 | |
| 135 | internal fun setEventLoop(eventLoop: EventLoop) { |
| 136 | ref.set(eventLoop) |
| 137 | } |
| 138 | } |
| 139 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 140 | @SharedImmutable |
| 141 | private val DISPOSED_TASK = Symbol("REMOVED_TASK") |
| 142 | |
| 143 | // results for scheduleImpl |
| 144 | private const val SCHEDULE_OK = 0 |
| 145 | private const val SCHEDULE_COMPLETED = 1 |
| 146 | private const val SCHEDULE_DISPOSED = 2 |
| 147 | |
| 148 | private const val MS_TO_NS = 1_000_000L |
| 149 | private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS |
| 150 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 151 | /** |
| 152 | * First-line overflow protection -- limit maximal delay. |
| 153 | * Delays longer than this one (~146 years) are considered to be delayed "forever". |
| 154 | */ |
| 155 | private const val MAX_DELAY_NS = Long.MAX_VALUE / 2 |
| 156 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 157 | internal fun delayToNanos(timeMillis: Long): Long = when { |
| 158 | timeMillis <= 0 -> 0L |
| 159 | timeMillis >= MAX_MS -> Long.MAX_VALUE |
| 160 | else -> timeMillis * MS_TO_NS |
| 161 | } |
| 162 | |
| 163 | internal fun delayNanosToMillis(timeNanos: Long): Long = |
| 164 | timeNanos / MS_TO_NS |
| 165 | |
| 166 | @SharedImmutable |
| 167 | private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") |
| 168 | |
| 169 | private typealias Queue<T> = LockFreeTaskQueueCore<T> |
| 170 | |
| 171 | internal expect abstract class EventLoopImplPlatform() : EventLoop { |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 172 | // Called to unpark this event loop's thread |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 173 | protected fun unpark() |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 174 | |
| 175 | // Called to reschedule to DefaultExecutor when this event loop is complete |
| 176 | protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 177 | } |
| 178 | |
| 179 | internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { |
| 180 | // null | CLOSED_EMPTY | task | Queue<Runnable> |
| 181 | private val _queue = atomic<Any?>(null) |
| 182 | |
| 183 | // Allocated only only once |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 184 | private val _delayed = atomic<DelayedTaskQueue?>(null) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 185 | |
Roman Elizarov | 8f39109 | 2019-09-24 10:54:47 +0300 | [diff] [blame] | 186 | private val _isCompleted = atomic(false) |
| 187 | private var isCompleted |
| 188 | get() = _isCompleted.value |
| 189 | set(value) { _isCompleted.value = value } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 190 | |
| 191 | override val isEmpty: Boolean get() { |
| 192 | if (!isUnconfinedQueueEmpty) return false |
| 193 | val delayed = _delayed.value |
| 194 | if (delayed != null && !delayed.isEmpty) return false |
Roman Elizarov | 8f39109 | 2019-09-24 10:54:47 +0300 | [diff] [blame] | 195 | return when (val queue = _queue.value) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 196 | null -> true |
| 197 | is Queue<*> -> queue.isEmpty |
| 198 | else -> queue === CLOSED_EMPTY |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | protected override val nextTime: Long |
| 203 | get() { |
| 204 | if (super.nextTime == 0L) return 0L |
| 205 | val queue = _queue.value |
| 206 | when { |
| 207 | queue === null -> {} // empty queue -- proceed |
| 208 | queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue |
| 209 | queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed |
| 210 | else -> return 0 // non-empty queue |
| 211 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 212 | val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 213 | return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0) |
| 214 | } |
| 215 | |
| 216 | override fun shutdown() { |
| 217 | // Clean up thread-local reference here -- this event loop is shutting down |
| 218 | ThreadLocalEventLoop.resetEventLoop() |
| 219 | // We should signal that this event loop should not accept any more tasks |
| 220 | // and process queued events (that could have been added after last processNextEvent) |
| 221 | isCompleted = true |
| 222 | closeQueue() |
| 223 | // complete processing of all queued tasks |
| 224 | while (processNextEvent() <= 0) { /* spin */ } |
| 225 | // reschedule the rest of delayed tasks |
| 226 | rescheduleAllDelayed() |
| 227 | } |
| 228 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 229 | public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { |
| 230 | val timeNanos = delayToNanos(timeMillis) |
| 231 | if (timeNanos < MAX_DELAY_NS) { |
| 232 | val now = nanoTime() |
| 233 | DelayedResumeTask(now + timeNanos, continuation).also { task -> |
| 234 | continuation.disposeOnCancellation(task) |
| 235 | schedule(now, task) |
| 236 | } |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { |
| 241 | val timeNanos = delayToNanos(timeMillis) |
| 242 | return if (timeNanos < MAX_DELAY_NS) { |
| 243 | val now = nanoTime() |
| 244 | DelayedRunnableTask(now + timeNanos, block).also { task -> |
| 245 | schedule(now, task) |
| 246 | } |
| 247 | } else { |
| 248 | NonDisposableHandle |
| 249 | } |
| 250 | } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 251 | |
| 252 | override fun processNextEvent(): Long { |
| 253 | // unconfined events take priority |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 254 | if (processUnconfinedEvent()) return 0 |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 255 | // queue all delayed tasks that are due to be executed |
| 256 | val delayed = _delayed.value |
| 257 | if (delayed != null && !delayed.isEmpty) { |
| 258 | val now = nanoTime() |
| 259 | while (true) { |
| 260 | // make sure that moving from delayed to queue removes from delayed only after it is added to queue |
| 261 | // to make sure that 'isEmpty' and `nextTime` that check both of them |
| 262 | // do not transiently report that both delayed and queue are empty during move |
| 263 | delayed.removeFirstIf { |
| 264 | if (it.timeToExecute(now)) { |
| 265 | enqueueImpl(it) |
| 266 | } else |
| 267 | false |
| 268 | } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" |
| 269 | } |
| 270 | } |
| 271 | // then process one event from queue |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 272 | val task = dequeue() |
| 273 | if (task != null) { |
| 274 | task.run() |
| 275 | return 0 |
| 276 | } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 277 | return nextTime |
| 278 | } |
| 279 | |
| 280 | public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) |
| 281 | |
| 282 | public fun enqueue(task: Runnable) { |
| 283 | if (enqueueImpl(task)) { |
| 284 | // todo: we should unpark only when this delayed task became first in the queue |
| 285 | unpark() |
| 286 | } else { |
| 287 | DefaultExecutor.enqueue(task) |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | @Suppress("UNCHECKED_CAST") |
| 292 | private fun enqueueImpl(task: Runnable): Boolean { |
| 293 | _queue.loop { queue -> |
| 294 | if (isCompleted) return false // fail fast if already completed, may still add, but queues will close |
| 295 | when (queue) { |
| 296 | null -> if (_queue.compareAndSet(null, task)) return true |
| 297 | is Queue<*> -> { |
| 298 | when ((queue as Queue<Runnable>).addLast(task)) { |
| 299 | Queue.ADD_SUCCESS -> return true |
| 300 | Queue.ADD_CLOSED -> return false |
| 301 | Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next()) |
| 302 | } |
| 303 | } |
| 304 | else -> when { |
| 305 | queue === CLOSED_EMPTY -> return false |
| 306 | else -> { |
| 307 | // update to full-blown queue to add one more |
| 308 | val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) |
| 309 | newQueue.addLast(queue as Runnable) |
| 310 | newQueue.addLast(task) |
| 311 | if (_queue.compareAndSet(queue, newQueue)) return true |
| 312 | } |
| 313 | } |
| 314 | } |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | @Suppress("UNCHECKED_CAST") |
| 319 | private fun dequeue(): Runnable? { |
| 320 | _queue.loop { queue -> |
| 321 | when (queue) { |
| 322 | null -> return null |
| 323 | is Queue<*> -> { |
| 324 | val result = (queue as Queue<Runnable>).removeFirstOrNull() |
| 325 | if (result !== Queue.REMOVE_FROZEN) return result as Runnable? |
| 326 | _queue.compareAndSet(queue, queue.next()) |
| 327 | } |
| 328 | else -> when { |
| 329 | queue === CLOSED_EMPTY -> return null |
| 330 | else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable |
| 331 | } |
| 332 | } |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | private fun closeQueue() { |
| 337 | assert { isCompleted } |
| 338 | _queue.loop { queue -> |
| 339 | when (queue) { |
| 340 | null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return |
| 341 | is Queue<*> -> { |
| 342 | queue.close() |
| 343 | return |
| 344 | } |
| 345 | else -> when { |
| 346 | queue === CLOSED_EMPTY -> return |
| 347 | else -> { |
| 348 | // update to full-blown queue to close |
| 349 | val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) |
| 350 | newQueue.addLast(queue as Runnable) |
| 351 | if (_queue.compareAndSet(queue, newQueue)) return |
| 352 | } |
| 353 | } |
| 354 | } |
| 355 | } |
| 356 | |
| 357 | } |
| 358 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 359 | public fun schedule(now: Long, delayedTask: DelayedTask) { |
| 360 | when (scheduleImpl(now, delayedTask)) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 361 | SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark() |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 362 | SCHEDULE_COMPLETED -> reschedule(now, delayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 363 | SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed |
| 364 | else -> error("unexpected result") |
| 365 | } |
| 366 | } |
| 367 | |
| 368 | private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task |
| 369 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 370 | private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 371 | if (isCompleted) return SCHEDULE_COMPLETED |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 372 | val delayedQueue = _delayed.value ?: run { |
| 373 | _delayed.compareAndSet(null, DelayedTaskQueue(now)) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 374 | _delayed.value!! |
| 375 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 376 | return delayedTask.scheduleTask(now, delayedQueue, this) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 377 | } |
| 378 | |
| 379 | // It performs "hard" shutdown for test cleanup purposes |
| 380 | protected fun resetAll() { |
| 381 | _queue.value = null |
| 382 | _delayed.value = null |
| 383 | } |
| 384 | |
| 385 | // This is a "soft" (normal) shutdown |
| 386 | private fun rescheduleAllDelayed() { |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 387 | val now = nanoTime() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 388 | while (true) { |
| 389 | /* |
| 390 | * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not |
| 391 | * synchronized on DelayedTask itself. All other operation are synchronized both on |
| 392 | * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose` |
| 393 | * first removes DelayedTask from the heap (under synchronization) then |
| 394 | * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update. |
| 395 | */ |
| 396 | val delayedTask = _delayed.value?.removeFirstOrNull() ?: break |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 397 | reschedule(now, delayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 398 | } |
| 399 | } |
| 400 | |
| 401 | internal abstract class DelayedTask( |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 402 | /** |
| 403 | * This field can be only modified in [scheduleTask] before putting this DelayedTask |
| 404 | * into heap to avoid overflow and corruption of heap data structure. |
| 405 | */ |
| 406 | @JvmField var nanoTime: Long |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 407 | ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode { |
| 408 | private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK |
| 409 | |
| 410 | override var heap: ThreadSafeHeap<*>? |
| 411 | get() = _heap as? ThreadSafeHeap<*> |
| 412 | set(value) { |
| 413 | require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing |
| 414 | _heap = value |
| 415 | } |
| 416 | |
| 417 | override var index: Int = -1 |
| 418 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 419 | override fun compareTo(other: DelayedTask): Int { |
| 420 | val dTime = nanoTime - other.nanoTime |
| 421 | return when { |
| 422 | dTime > 0 -> 1 |
| 423 | dTime < 0 -> -1 |
| 424 | else -> 0 |
| 425 | } |
| 426 | } |
| 427 | |
| 428 | fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L |
| 429 | |
| 430 | @Synchronized |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 431 | fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 432 | if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 433 | delayed.addLastIf(this) { firstTask -> |
| 434 | if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask |
| 435 | /** |
| 436 | * We are about to add new task and we have to make sure that [DelayedTaskQueue] |
| 437 | * invariant is maintained. The code in this lambda is additionally executed under |
| 438 | * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe. |
| 439 | */ |
| 440 | if (firstTask == null) { |
| 441 | /** |
| 442 | * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to |
| 443 | * the current now time even if that means "going backwards in time". This makes the structure |
| 444 | * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks |
| 445 | * are removed from the delayed queue for execution. |
| 446 | */ |
| 447 | delayed.timeNow = now |
| 448 | } else { |
| 449 | /** |
| 450 | * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time |
| 451 | * and only goes forward in time. We cannot let it go backwards in time or invariant can be |
| 452 | * violated for tasks that were already scheduled. |
| 453 | */ |
| 454 | val firstTime = firstTask.nanoTime |
| 455 | // compute min(now, firstTime) using a wrap-safe check |
| 456 | val minTime = if (firstTime - now >= 0) now else firstTime |
| 457 | // update timeNow only when going forward in time |
| 458 | if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime |
| 459 | } |
| 460 | /** |
| 461 | * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added |
| 462 | * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask |
| 463 | * function can be called to reschedule from one queue to another and this might be another reason |
| 464 | * where new task's time might now violate invariant. |
| 465 | * We correct invariant violation (if any) by simply changing this task's time to now. |
| 466 | */ |
| 467 | if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow |
| 468 | true |
| 469 | } |
| 470 | return SCHEDULE_OK |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 471 | } |
| 472 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 473 | @Synchronized |
| 474 | final override fun dispose() { |
| 475 | val heap = _heap |
| 476 | if (heap === DISPOSED_TASK) return // already disposed |
| 477 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 478 | (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 479 | _heap = DISPOSED_TASK // never add again to any heap |
| 480 | } |
| 481 | |
| 482 | override fun toString(): String = "Delayed[nanos=$nanoTime]" |
| 483 | } |
| 484 | |
| 485 | private inner class DelayedResumeTask( |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 486 | nanoTime: Long, |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 487 | private val cont: CancellableContinuation<Unit> |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 488 | ) : DelayedTask(nanoTime) { |
| 489 | override fun run() { with(cont) { resumeUndispatched(Unit) } } |
| 490 | override fun toString(): String = super.toString() + cont.toString() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 491 | } |
| 492 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 493 | private class DelayedRunnableTask( |
| 494 | nanoTime: Long, |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 495 | private val block: Runnable |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 496 | ) : DelayedTask(nanoTime) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 497 | override fun run() { block.run() } |
| 498 | override fun toString(): String = super.toString() + block.toString() |
| 499 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 500 | |
| 501 | /** |
| 502 | * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in |
| 503 | * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the |
| 504 | * heap data structure in spite of potential non-monotonicity of `nanoTime()` source. |
| 505 | * The invariant is that for every scheduled [DelayedTask]: |
| 506 | * |
| 507 | * ``` |
| 508 | * delayedTask.nanoTime - timeNow >= 0 |
| 509 | * ``` |
| 510 | * |
| 511 | * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as |
| 512 | * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when |
| 513 | * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed |
| 514 | * (so there is nothing special to do there). |
| 515 | */ |
| 516 | internal class DelayedTaskQueue( |
| 517 | @JvmField var timeNow: Long |
| 518 | ) : ThreadSafeHeap<DelayedTask>() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 519 | } |
| 520 | |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 521 | internal expect fun createEventLoop(): EventLoop |
| 522 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 523 | internal expect fun nanoTime(): Long |
| 524 | |
| 525 | internal expect object DefaultExecutor { |
| 526 | public fun enqueue(task: Runnable) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 527 | } |
| 528 | |