Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | 79e555e | 2021-05-17 17:41:41 +0000 | [diff] [blame] | 2 | * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
| 5 | package kotlinx.coroutines |
| 6 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 7 | import kotlinx.atomicfu.* |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.internal.* |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 9 | import kotlin.coroutines.* |
| 10 | import kotlin.jvm.* |
Roman Elizarov | 60f8688 | 2019-12-17 19:14:52 +0300 | [diff] [blame] | 11 | import kotlin.native.concurrent.* |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 12 | |
| 13 | /** |
| 14 | * Extended by [CoroutineDispatcher] implementations that have event loop inside and can |
| 15 | * be asked to process next event from their event queue. |
| 16 | * |
| 17 | * It may optionally implement [Delay] interface and support time-scheduled tasks. |
| 18 | * It is created or pigged back onto (see [ThreadLocalEventLoop]) |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 19 | * by `runBlocking` and by [Dispatchers.Unconfined]. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 20 | * |
| 21 | * @suppress **This an internal API and should not be used from general code.** |
| 22 | */ |
| 23 | internal abstract class EventLoop : CoroutineDispatcher() { |
| 24 | /** |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 25 | * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 26 | */ |
| 27 | private var useCount = 0L |
| 28 | |
| 29 | /** |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 30 | * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 31 | * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely |
| 32 | * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. |
| 33 | */ |
| 34 | private var shared = false |
| 35 | |
| 36 | /** |
| 37 | * Queue used by [Dispatchers.Unconfined] tasks. |
| 38 | * These tasks are thread-local for performance and take precedence over the rest of the queue. |
| 39 | */ |
| 40 | private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null |
| 41 | |
| 42 | /** |
| 43 | * Processes next event in this event loop. |
| 44 | * |
| 45 | * The result of this function is to be interpreted like this: |
| 46 | * * `<= 0` -- there are potentially more events for immediate processing; |
| 47 | * * `> 0` -- a number of nanoseconds to wait for next scheduled event; |
| 48 | * * [Long.MAX_VALUE] -- no more events. |
| 49 | * |
| 50 | * **NOTE**: Must be invoked only from the event loop's thread |
| 51 | * (no check for performance reasons, may be added in the future). |
| 52 | */ |
| 53 | public open fun processNextEvent(): Long { |
| 54 | if (!processUnconfinedEvent()) return Long.MAX_VALUE |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 55 | return 0 |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty |
| 59 | |
| 60 | protected open val nextTime: Long |
| 61 | get() { |
| 62 | val queue = unconfinedQueue ?: return Long.MAX_VALUE |
| 63 | return if (queue.isEmpty) Long.MAX_VALUE else 0L |
| 64 | } |
| 65 | |
| 66 | public fun processUnconfinedEvent(): Boolean { |
| 67 | val queue = unconfinedQueue ?: return false |
| 68 | val task = queue.removeFirstOrNull() ?: return false |
| 69 | task.run() |
| 70 | return true |
| 71 | } |
| 72 | /** |
| 73 | * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context |
| 74 | * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). |
| 75 | * By default, event loop implementation is thread-local and should not processed in the context |
| 76 | * (current thread's event loop should be processed instead). |
| 77 | */ |
| 78 | public open fun shouldBeProcessedFromContext(): Boolean = false |
| 79 | |
| 80 | /** |
| 81 | * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] |
| 82 | * into the current event loop. |
| 83 | */ |
| 84 | public fun dispatchUnconfined(task: DispatchedTask<*>) { |
| 85 | val queue = unconfinedQueue ?: |
| 86 | ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } |
| 87 | queue.addLast(task) |
| 88 | } |
| 89 | |
| 90 | public val isActive: Boolean |
| 91 | get() = useCount > 0 |
| 92 | |
| 93 | public val isUnconfinedLoopActive: Boolean |
| 94 | get() = useCount >= delta(unconfined = true) |
| 95 | |
| 96 | // May only be used from the event loop's thread |
| 97 | public val isUnconfinedQueueEmpty: Boolean |
| 98 | get() = unconfinedQueue?.isEmpty ?: true |
| 99 | |
| 100 | private fun delta(unconfined: Boolean) = |
| 101 | if (unconfined) (1L shl 32) else 1L |
| 102 | |
| 103 | fun incrementUseCount(unconfined: Boolean = false) { |
| 104 | useCount += delta(unconfined) |
| 105 | if (!unconfined) shared = true |
| 106 | } |
| 107 | |
| 108 | fun decrementUseCount(unconfined: Boolean = false) { |
| 109 | useCount -= delta(unconfined) |
| 110 | if (useCount > 0) return |
Roman Elizarov | 583d39d | 2019-07-02 16:21:22 +0300 | [diff] [blame] | 111 | assert { useCount == 0L } // "Extra decrementUseCount" |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 112 | if (shared) { |
| 113 | // shut it down and remove from ThreadLocalEventLoop |
| 114 | shutdown() |
| 115 | } |
| 116 | } |
| 117 | |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 118 | final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { |
| 119 | parallelism.checkParallelism() |
| 120 | return this |
| 121 | } |
| 122 | |
| 123 | open fun shutdown() {} |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 124 | } |
| 125 | |
Roman Elizarov | 12a0318 | 2019-12-19 11:09:09 +0300 | [diff] [blame] | 126 | @ThreadLocal |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 127 | internal object ThreadLocalEventLoop { |
| 128 | private val ref = CommonThreadLocal<EventLoop?>() |
| 129 | |
| 130 | internal val eventLoop: EventLoop |
| 131 | get() = ref.get() ?: createEventLoop().also { ref.set(it) } |
| 132 | |
| 133 | internal fun currentOrNull(): EventLoop? = |
| 134 | ref.get() |
| 135 | |
| 136 | internal fun resetEventLoop() { |
| 137 | ref.set(null) |
| 138 | } |
| 139 | |
| 140 | internal fun setEventLoop(eventLoop: EventLoop) { |
| 141 | ref.set(eventLoop) |
| 142 | } |
| 143 | } |
| 144 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 145 | @SharedImmutable |
| 146 | private val DISPOSED_TASK = Symbol("REMOVED_TASK") |
| 147 | |
| 148 | // results for scheduleImpl |
| 149 | private const val SCHEDULE_OK = 0 |
| 150 | private const val SCHEDULE_COMPLETED = 1 |
| 151 | private const val SCHEDULE_DISPOSED = 2 |
| 152 | |
| 153 | private const val MS_TO_NS = 1_000_000L |
| 154 | private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS |
| 155 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 156 | /** |
| 157 | * First-line overflow protection -- limit maximal delay. |
| 158 | * Delays longer than this one (~146 years) are considered to be delayed "forever". |
| 159 | */ |
| 160 | private const val MAX_DELAY_NS = Long.MAX_VALUE / 2 |
| 161 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 162 | internal fun delayToNanos(timeMillis: Long): Long = when { |
| 163 | timeMillis <= 0 -> 0L |
| 164 | timeMillis >= MAX_MS -> Long.MAX_VALUE |
| 165 | else -> timeMillis * MS_TO_NS |
| 166 | } |
| 167 | |
| 168 | internal fun delayNanosToMillis(timeNanos: Long): Long = |
| 169 | timeNanos / MS_TO_NS |
| 170 | |
| 171 | @SharedImmutable |
| 172 | private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") |
| 173 | |
| 174 | private typealias Queue<T> = LockFreeTaskQueueCore<T> |
| 175 | |
| 176 | internal expect abstract class EventLoopImplPlatform() : EventLoop { |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 177 | // Called to unpark this event loop's thread |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 178 | protected fun unpark() |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 179 | |
| 180 | // Called to reschedule to DefaultExecutor when this event loop is complete |
| 181 | protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 182 | } |
| 183 | |
| 184 | internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { |
| 185 | // null | CLOSED_EMPTY | task | Queue<Runnable> |
| 186 | private val _queue = atomic<Any?>(null) |
| 187 | |
| 188 | // Allocated only only once |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 189 | private val _delayed = atomic<DelayedTaskQueue?>(null) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 190 | |
Roman Elizarov | 8f39109 | 2019-09-24 10:54:47 +0300 | [diff] [blame] | 191 | private val _isCompleted = atomic(false) |
| 192 | private var isCompleted |
| 193 | get() = _isCompleted.value |
| 194 | set(value) { _isCompleted.value = value } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 195 | |
| 196 | override val isEmpty: Boolean get() { |
| 197 | if (!isUnconfinedQueueEmpty) return false |
| 198 | val delayed = _delayed.value |
| 199 | if (delayed != null && !delayed.isEmpty) return false |
Roman Elizarov | 8f39109 | 2019-09-24 10:54:47 +0300 | [diff] [blame] | 200 | return when (val queue = _queue.value) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 201 | null -> true |
| 202 | is Queue<*> -> queue.isEmpty |
| 203 | else -> queue === CLOSED_EMPTY |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | protected override val nextTime: Long |
| 208 | get() { |
| 209 | if (super.nextTime == 0L) return 0L |
| 210 | val queue = _queue.value |
| 211 | when { |
| 212 | queue === null -> {} // empty queue -- proceed |
| 213 | queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue |
| 214 | queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed |
| 215 | else -> return 0 // non-empty queue |
| 216 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 217 | val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 218 | return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0) |
| 219 | } |
| 220 | |
| 221 | override fun shutdown() { |
| 222 | // Clean up thread-local reference here -- this event loop is shutting down |
| 223 | ThreadLocalEventLoop.resetEventLoop() |
| 224 | // We should signal that this event loop should not accept any more tasks |
| 225 | // and process queued events (that could have been added after last processNextEvent) |
| 226 | isCompleted = true |
| 227 | closeQueue() |
| 228 | // complete processing of all queued tasks |
| 229 | while (processNextEvent() <= 0) { /* spin */ } |
| 230 | // reschedule the rest of delayed tasks |
| 231 | rescheduleAllDelayed() |
| 232 | } |
| 233 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 234 | public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { |
| 235 | val timeNanos = delayToNanos(timeMillis) |
| 236 | if (timeNanos < MAX_DELAY_NS) { |
| 237 | val now = nanoTime() |
| 238 | DelayedResumeTask(now + timeNanos, continuation).also { task -> |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 239 | /* |
| 240 | * Order is important here: first we schedule the heap and only then |
| 241 | * publish it to continuation. Otherwise, `DelayedResumeTask` would |
| 242 | * have to know how to be disposed of even when it wasn't scheduled yet. |
| 243 | */ |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 244 | schedule(now, task) |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 245 | continuation.disposeOnCancellation(task) |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 246 | } |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { |
| 251 | val timeNanos = delayToNanos(timeMillis) |
| 252 | return if (timeNanos < MAX_DELAY_NS) { |
| 253 | val now = nanoTime() |
| 254 | DelayedRunnableTask(now + timeNanos, block).also { task -> |
| 255 | schedule(now, task) |
| 256 | } |
| 257 | } else { |
| 258 | NonDisposableHandle |
| 259 | } |
| 260 | } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 261 | |
| 262 | override fun processNextEvent(): Long { |
| 263 | // unconfined events take priority |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 264 | if (processUnconfinedEvent()) return 0 |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 265 | // queue all delayed tasks that are due to be executed |
| 266 | val delayed = _delayed.value |
| 267 | if (delayed != null && !delayed.isEmpty) { |
| 268 | val now = nanoTime() |
| 269 | while (true) { |
| 270 | // make sure that moving from delayed to queue removes from delayed only after it is added to queue |
| 271 | // to make sure that 'isEmpty' and `nextTime` that check both of them |
| 272 | // do not transiently report that both delayed and queue are empty during move |
| 273 | delayed.removeFirstIf { |
| 274 | if (it.timeToExecute(now)) { |
| 275 | enqueueImpl(it) |
| 276 | } else |
| 277 | false |
| 278 | } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete" |
| 279 | } |
| 280 | } |
| 281 | // then process one event from queue |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 282 | val task = dequeue() |
| 283 | if (task != null) { |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 284 | platformAutoreleasePool { task.run() } |
dkhalanskyjb | 96b41b4 | 2020-03-30 18:54:09 +0300 | [diff] [blame] | 285 | return 0 |
| 286 | } |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 287 | return nextTime |
| 288 | } |
| 289 | |
| 290 | public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) |
| 291 | |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 292 | open fun enqueue(task: Runnable) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 293 | if (enqueueImpl(task)) { |
| 294 | // todo: we should unpark only when this delayed task became first in the queue |
| 295 | unpark() |
| 296 | } else { |
| 297 | DefaultExecutor.enqueue(task) |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | @Suppress("UNCHECKED_CAST") |
| 302 | private fun enqueueImpl(task: Runnable): Boolean { |
| 303 | _queue.loop { queue -> |
| 304 | if (isCompleted) return false // fail fast if already completed, may still add, but queues will close |
| 305 | when (queue) { |
| 306 | null -> if (_queue.compareAndSet(null, task)) return true |
| 307 | is Queue<*> -> { |
| 308 | when ((queue as Queue<Runnable>).addLast(task)) { |
| 309 | Queue.ADD_SUCCESS -> return true |
| 310 | Queue.ADD_CLOSED -> return false |
| 311 | Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next()) |
| 312 | } |
| 313 | } |
| 314 | else -> when { |
| 315 | queue === CLOSED_EMPTY -> return false |
| 316 | else -> { |
| 317 | // update to full-blown queue to add one more |
| 318 | val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) |
| 319 | newQueue.addLast(queue as Runnable) |
| 320 | newQueue.addLast(task) |
| 321 | if (_queue.compareAndSet(queue, newQueue)) return true |
| 322 | } |
| 323 | } |
| 324 | } |
| 325 | } |
| 326 | } |
| 327 | |
| 328 | @Suppress("UNCHECKED_CAST") |
| 329 | private fun dequeue(): Runnable? { |
| 330 | _queue.loop { queue -> |
| 331 | when (queue) { |
| 332 | null -> return null |
| 333 | is Queue<*> -> { |
| 334 | val result = (queue as Queue<Runnable>).removeFirstOrNull() |
| 335 | if (result !== Queue.REMOVE_FROZEN) return result as Runnable? |
| 336 | _queue.compareAndSet(queue, queue.next()) |
| 337 | } |
| 338 | else -> when { |
| 339 | queue === CLOSED_EMPTY -> return null |
| 340 | else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable |
| 341 | } |
| 342 | } |
| 343 | } |
| 344 | } |
| 345 | |
| 346 | private fun closeQueue() { |
| 347 | assert { isCompleted } |
| 348 | _queue.loop { queue -> |
| 349 | when (queue) { |
| 350 | null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return |
| 351 | is Queue<*> -> { |
| 352 | queue.close() |
| 353 | return |
| 354 | } |
| 355 | else -> when { |
| 356 | queue === CLOSED_EMPTY -> return |
| 357 | else -> { |
| 358 | // update to full-blown queue to close |
| 359 | val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true) |
| 360 | newQueue.addLast(queue as Runnable) |
| 361 | if (_queue.compareAndSet(queue, newQueue)) return |
| 362 | } |
| 363 | } |
| 364 | } |
| 365 | } |
| 366 | |
| 367 | } |
| 368 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 369 | public fun schedule(now: Long, delayedTask: DelayedTask) { |
| 370 | when (scheduleImpl(now, delayedTask)) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 371 | SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark() |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 372 | SCHEDULE_COMPLETED -> reschedule(now, delayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 373 | SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed |
| 374 | else -> error("unexpected result") |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task |
| 379 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 380 | private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 381 | if (isCompleted) return SCHEDULE_COMPLETED |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 382 | val delayedQueue = _delayed.value ?: run { |
| 383 | _delayed.compareAndSet(null, DelayedTaskQueue(now)) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 384 | _delayed.value!! |
| 385 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 386 | return delayedTask.scheduleTask(now, delayedQueue, this) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 387 | } |
| 388 | |
| 389 | // It performs "hard" shutdown for test cleanup purposes |
| 390 | protected fun resetAll() { |
| 391 | _queue.value = null |
| 392 | _delayed.value = null |
| 393 | } |
| 394 | |
| 395 | // This is a "soft" (normal) shutdown |
| 396 | private fun rescheduleAllDelayed() { |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 397 | val now = nanoTime() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 398 | while (true) { |
| 399 | /* |
| 400 | * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not |
| 401 | * synchronized on DelayedTask itself. All other operation are synchronized both on |
| 402 | * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose` |
| 403 | * first removes DelayedTask from the heap (under synchronization) then |
| 404 | * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update. |
| 405 | */ |
| 406 | val delayedTask = _delayed.value?.removeFirstOrNull() ?: break |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 407 | reschedule(now, delayedTask) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 408 | } |
| 409 | } |
| 410 | |
| 411 | internal abstract class DelayedTask( |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 412 | /** |
| 413 | * This field can be only modified in [scheduleTask] before putting this DelayedTask |
| 414 | * into heap to avoid overflow and corruption of heap data structure. |
| 415 | */ |
| 416 | @JvmField var nanoTime: Long |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 417 | ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode { |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 418 | @Volatile |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 419 | private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK |
| 420 | |
| 421 | override var heap: ThreadSafeHeap<*>? |
| 422 | get() = _heap as? ThreadSafeHeap<*> |
| 423 | set(value) { |
| 424 | require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing |
| 425 | _heap = value |
| 426 | } |
| 427 | |
| 428 | override var index: Int = -1 |
| 429 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 430 | override fun compareTo(other: DelayedTask): Int { |
| 431 | val dTime = nanoTime - other.nanoTime |
| 432 | return when { |
| 433 | dTime > 0 -> 1 |
| 434 | dTime < 0 -> -1 |
| 435 | else -> 0 |
| 436 | } |
| 437 | } |
| 438 | |
| 439 | fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L |
| 440 | |
| 441 | @Synchronized |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 442 | fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 443 | if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 444 | delayed.addLastIf(this) { firstTask -> |
| 445 | if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask |
| 446 | /** |
| 447 | * We are about to add new task and we have to make sure that [DelayedTaskQueue] |
| 448 | * invariant is maintained. The code in this lambda is additionally executed under |
| 449 | * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe. |
| 450 | */ |
| 451 | if (firstTask == null) { |
| 452 | /** |
| 453 | * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to |
| 454 | * the current now time even if that means "going backwards in time". This makes the structure |
| 455 | * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks |
| 456 | * are removed from the delayed queue for execution. |
| 457 | */ |
| 458 | delayed.timeNow = now |
| 459 | } else { |
| 460 | /** |
| 461 | * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time |
| 462 | * and only goes forward in time. We cannot let it go backwards in time or invariant can be |
| 463 | * violated for tasks that were already scheduled. |
| 464 | */ |
| 465 | val firstTime = firstTask.nanoTime |
| 466 | // compute min(now, firstTime) using a wrap-safe check |
| 467 | val minTime = if (firstTime - now >= 0) now else firstTime |
| 468 | // update timeNow only when going forward in time |
| 469 | if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime |
| 470 | } |
| 471 | /** |
| 472 | * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added |
| 473 | * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask |
| 474 | * function can be called to reschedule from one queue to another and this might be another reason |
| 475 | * where new task's time might now violate invariant. |
| 476 | * We correct invariant violation (if any) by simply changing this task's time to now. |
| 477 | */ |
| 478 | if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow |
| 479 | true |
| 480 | } |
| 481 | return SCHEDULE_OK |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 482 | } |
| 483 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 484 | @Synchronized |
| 485 | final override fun dispose() { |
| 486 | val heap = _heap |
| 487 | if (heap === DISPOSED_TASK) return // already disposed |
| 488 | @Suppress("UNCHECKED_CAST") |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 489 | (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 490 | _heap = DISPOSED_TASK // never add again to any heap |
| 491 | } |
| 492 | |
| 493 | override fun toString(): String = "Delayed[nanos=$nanoTime]" |
| 494 | } |
| 495 | |
| 496 | private inner class DelayedResumeTask( |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 497 | nanoTime: Long, |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 498 | private val cont: CancellableContinuation<Unit> |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 499 | ) : DelayedTask(nanoTime) { |
| 500 | override fun run() { with(cont) { resumeUndispatched(Unit) } } |
| 501 | override fun toString(): String = super.toString() + cont.toString() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 502 | } |
| 503 | |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 504 | private class DelayedRunnableTask( |
| 505 | nanoTime: Long, |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 506 | private val block: Runnable |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 507 | ) : DelayedTask(nanoTime) { |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 508 | override fun run() { block.run() } |
| 509 | override fun toString(): String = super.toString() + block.toString() |
| 510 | } |
Roman Elizarov | 4809393 | 2019-07-06 13:13:32 +0300 | [diff] [blame] | 511 | |
| 512 | /** |
| 513 | * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in |
| 514 | * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the |
| 515 | * heap data structure in spite of potential non-monotonicity of `nanoTime()` source. |
| 516 | * The invariant is that for every scheduled [DelayedTask]: |
| 517 | * |
| 518 | * ``` |
| 519 | * delayedTask.nanoTime - timeNow >= 0 |
| 520 | * ``` |
| 521 | * |
| 522 | * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as |
| 523 | * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when |
| 524 | * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed |
| 525 | * (so there is nothing special to do there). |
| 526 | */ |
| 527 | internal class DelayedTaskQueue( |
| 528 | @JvmField var timeNow: Long |
| 529 | ) : ThreadSafeHeap<DelayedTask>() |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 530 | } |
| 531 | |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 532 | internal expect fun createEventLoop(): EventLoop |
| 533 | |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 534 | internal expect fun nanoTime(): Long |
| 535 | |
| 536 | internal expect object DefaultExecutor { |
| 537 | public fun enqueue(task: Runnable) |
Roman Elizarov | 98a9705 | 2019-07-02 15:30:47 +0300 | [diff] [blame] | 538 | } |
| 539 | |
Steve Elliott | ca095be | 2022-07-25 14:26:10 +0000 | [diff] [blame] | 540 | /** |
| 541 | * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and |
| 542 | * non-Darwin native targets. |
| 543 | * |
| 544 | * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to |
| 545 | * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must |
| 546 | * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic |
| 547 | * pool management, it must manage the pool creation and pool drainage manually. |
| 548 | */ |
| 549 | internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit) |