blob: 12940c54e2701e60bea4924aabb26bcda1fce234 [file] [log] [blame]
Roman Elizarov51738242018-12-21 16:41:39 +03001/*
Aurimas Liutikas79e555e2021-05-17 17:41:41 +00002 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov51738242018-12-21 16:41:39 +03003 */
4
5package kotlinx.coroutines
6
Roman Elizarov98a97052019-07-02 15:30:47 +03007import kotlinx.atomicfu.*
Roman Elizarov51738242018-12-21 16:41:39 +03008import kotlinx.coroutines.internal.*
Roman Elizarov98a97052019-07-02 15:30:47 +03009import kotlin.coroutines.*
10import kotlin.jvm.*
Roman Elizarov60f86882019-12-17 19:14:52 +030011import kotlin.native.concurrent.*
Roman Elizarov51738242018-12-21 16:41:39 +030012
13/**
14 * Extended by [CoroutineDispatcher] implementations that have event loop inside and can
15 * be asked to process next event from their event queue.
16 *
17 * It may optionally implement [Delay] interface and support time-scheduled tasks.
18 * It is created or pigged back onto (see [ThreadLocalEventLoop])
Roman Elizarov48093932019-07-06 13:13:32 +030019 * by `runBlocking` and by [Dispatchers.Unconfined].
Roman Elizarov51738242018-12-21 16:41:39 +030020 *
21 * @suppress **This an internal API and should not be used from general code.**
22 */
23internal abstract class EventLoop : CoroutineDispatcher() {
24 /**
Roman Elizarov48093932019-07-06 13:13:32 +030025 * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
Roman Elizarov51738242018-12-21 16:41:39 +030026 */
27 private var useCount = 0L
28
29 /**
Roman Elizarov48093932019-07-06 13:13:32 +030030 * Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
Roman Elizarov51738242018-12-21 16:41:39 +030031 * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
32 * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
33 */
34 private var shared = false
35
36 /**
37 * Queue used by [Dispatchers.Unconfined] tasks.
38 * These tasks are thread-local for performance and take precedence over the rest of the queue.
39 */
40 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
41
42 /**
43 * Processes next event in this event loop.
44 *
45 * The result of this function is to be interpreted like this:
46 * * `<= 0` -- there are potentially more events for immediate processing;
47 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
48 * * [Long.MAX_VALUE] -- no more events.
49 *
50 * **NOTE**: Must be invoked only from the event loop's thread
51 * (no check for performance reasons, may be added in the future).
52 */
53 public open fun processNextEvent(): Long {
54 if (!processUnconfinedEvent()) return Long.MAX_VALUE
dkhalanskyjb96b41b42020-03-30 18:54:09 +030055 return 0
Roman Elizarov51738242018-12-21 16:41:39 +030056 }
57
58 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
59
60 protected open val nextTime: Long
61 get() {
62 val queue = unconfinedQueue ?: return Long.MAX_VALUE
63 return if (queue.isEmpty) Long.MAX_VALUE else 0L
64 }
65
66 public fun processUnconfinedEvent(): Boolean {
67 val queue = unconfinedQueue ?: return false
68 val task = queue.removeFirstOrNull() ?: return false
69 task.run()
70 return true
71 }
72 /**
73 * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
74 * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
75 * By default, event loop implementation is thread-local and should not processed in the context
76 * (current thread's event loop should be processed instead).
77 */
78 public open fun shouldBeProcessedFromContext(): Boolean = false
79
80 /**
81 * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
82 * into the current event loop.
83 */
84 public fun dispatchUnconfined(task: DispatchedTask<*>) {
85 val queue = unconfinedQueue ?:
86 ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
87 queue.addLast(task)
88 }
89
90 public val isActive: Boolean
91 get() = useCount > 0
92
93 public val isUnconfinedLoopActive: Boolean
94 get() = useCount >= delta(unconfined = true)
95
96 // May only be used from the event loop's thread
97 public val isUnconfinedQueueEmpty: Boolean
98 get() = unconfinedQueue?.isEmpty ?: true
99
100 private fun delta(unconfined: Boolean) =
101 if (unconfined) (1L shl 32) else 1L
102
103 fun incrementUseCount(unconfined: Boolean = false) {
104 useCount += delta(unconfined)
105 if (!unconfined) shared = true
106 }
107
108 fun decrementUseCount(unconfined: Boolean = false) {
109 useCount -= delta(unconfined)
110 if (useCount > 0) return
Roman Elizarov583d39d2019-07-02 16:21:22 +0300111 assert { useCount == 0L } // "Extra decrementUseCount"
Roman Elizarov51738242018-12-21 16:41:39 +0300112 if (shared) {
113 // shut it down and remove from ThreadLocalEventLoop
114 shutdown()
115 }
116 }
117
Steve Elliottca095be2022-07-25 14:26:10 +0000118 final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
119 parallelism.checkParallelism()
120 return this
121 }
122
123 open fun shutdown() {}
Roman Elizarov51738242018-12-21 16:41:39 +0300124}
125
Roman Elizarov12a03182019-12-19 11:09:09 +0300126@ThreadLocal
Roman Elizarov51738242018-12-21 16:41:39 +0300127internal object ThreadLocalEventLoop {
128 private val ref = CommonThreadLocal<EventLoop?>()
129
130 internal val eventLoop: EventLoop
131 get() = ref.get() ?: createEventLoop().also { ref.set(it) }
132
133 internal fun currentOrNull(): EventLoop? =
134 ref.get()
135
136 internal fun resetEventLoop() {
137 ref.set(null)
138 }
139
140 internal fun setEventLoop(eventLoop: EventLoop) {
141 ref.set(eventLoop)
142 }
143}
144
Roman Elizarov98a97052019-07-02 15:30:47 +0300145@SharedImmutable
146private val DISPOSED_TASK = Symbol("REMOVED_TASK")
147
148// results for scheduleImpl
149private const val SCHEDULE_OK = 0
150private const val SCHEDULE_COMPLETED = 1
151private const val SCHEDULE_DISPOSED = 2
152
153private const val MS_TO_NS = 1_000_000L
154private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
155
Roman Elizarov48093932019-07-06 13:13:32 +0300156/**
157 * First-line overflow protection -- limit maximal delay.
158 * Delays longer than this one (~146 years) are considered to be delayed "forever".
159 */
160private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
161
Roman Elizarov98a97052019-07-02 15:30:47 +0300162internal fun delayToNanos(timeMillis: Long): Long = when {
163 timeMillis <= 0 -> 0L
164 timeMillis >= MAX_MS -> Long.MAX_VALUE
165 else -> timeMillis * MS_TO_NS
166}
167
168internal fun delayNanosToMillis(timeNanos: Long): Long =
169 timeNanos / MS_TO_NS
170
171@SharedImmutable
172private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
173
174private typealias Queue<T> = LockFreeTaskQueueCore<T>
175
176internal expect abstract class EventLoopImplPlatform() : EventLoop {
Roman Elizarov48093932019-07-06 13:13:32 +0300177 // Called to unpark this event loop's thread
Roman Elizarov98a97052019-07-02 15:30:47 +0300178 protected fun unpark()
Roman Elizarov48093932019-07-06 13:13:32 +0300179
180 // Called to reschedule to DefaultExecutor when this event loop is complete
181 protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
Roman Elizarov98a97052019-07-02 15:30:47 +0300182}
183
184internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
185 // null | CLOSED_EMPTY | task | Queue<Runnable>
186 private val _queue = atomic<Any?>(null)
187
188 // Allocated only only once
Roman Elizarov48093932019-07-06 13:13:32 +0300189 private val _delayed = atomic<DelayedTaskQueue?>(null)
Roman Elizarov98a97052019-07-02 15:30:47 +0300190
Roman Elizarov8f391092019-09-24 10:54:47 +0300191 private val _isCompleted = atomic(false)
192 private var isCompleted
193 get() = _isCompleted.value
194 set(value) { _isCompleted.value = value }
Roman Elizarov98a97052019-07-02 15:30:47 +0300195
196 override val isEmpty: Boolean get() {
197 if (!isUnconfinedQueueEmpty) return false
198 val delayed = _delayed.value
199 if (delayed != null && !delayed.isEmpty) return false
Roman Elizarov8f391092019-09-24 10:54:47 +0300200 return when (val queue = _queue.value) {
Roman Elizarov98a97052019-07-02 15:30:47 +0300201 null -> true
202 is Queue<*> -> queue.isEmpty
203 else -> queue === CLOSED_EMPTY
204 }
205 }
206
207 protected override val nextTime: Long
208 get() {
209 if (super.nextTime == 0L) return 0L
210 val queue = _queue.value
211 when {
212 queue === null -> {} // empty queue -- proceed
213 queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
214 queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
215 else -> return 0 // non-empty queue
216 }
Roman Elizarov48093932019-07-06 13:13:32 +0300217 val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
Roman Elizarov98a97052019-07-02 15:30:47 +0300218 return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
219 }
220
221 override fun shutdown() {
222 // Clean up thread-local reference here -- this event loop is shutting down
223 ThreadLocalEventLoop.resetEventLoop()
224 // We should signal that this event loop should not accept any more tasks
225 // and process queued events (that could have been added after last processNextEvent)
226 isCompleted = true
227 closeQueue()
228 // complete processing of all queued tasks
229 while (processNextEvent() <= 0) { /* spin */ }
230 // reschedule the rest of delayed tasks
231 rescheduleAllDelayed()
232 }
233
Roman Elizarov48093932019-07-06 13:13:32 +0300234 public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
235 val timeNanos = delayToNanos(timeMillis)
236 if (timeNanos < MAX_DELAY_NS) {
237 val now = nanoTime()
238 DelayedResumeTask(now + timeNanos, continuation).also { task ->
Steve Elliottca095be2022-07-25 14:26:10 +0000239 /*
240 * Order is important here: first we schedule the heap and only then
241 * publish it to continuation. Otherwise, `DelayedResumeTask` would
242 * have to know how to be disposed of even when it wasn't scheduled yet.
243 */
Roman Elizarov48093932019-07-06 13:13:32 +0300244 schedule(now, task)
Steve Elliottca095be2022-07-25 14:26:10 +0000245 continuation.disposeOnCancellation(task)
Roman Elizarov48093932019-07-06 13:13:32 +0300246 }
247 }
248 }
249
250 protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
251 val timeNanos = delayToNanos(timeMillis)
252 return if (timeNanos < MAX_DELAY_NS) {
253 val now = nanoTime()
254 DelayedRunnableTask(now + timeNanos, block).also { task ->
255 schedule(now, task)
256 }
257 } else {
258 NonDisposableHandle
259 }
260 }
Roman Elizarov98a97052019-07-02 15:30:47 +0300261
262 override fun processNextEvent(): Long {
263 // unconfined events take priority
dkhalanskyjb96b41b42020-03-30 18:54:09 +0300264 if (processUnconfinedEvent()) return 0
Roman Elizarov98a97052019-07-02 15:30:47 +0300265 // queue all delayed tasks that are due to be executed
266 val delayed = _delayed.value
267 if (delayed != null && !delayed.isEmpty) {
268 val now = nanoTime()
269 while (true) {
270 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
271 // to make sure that 'isEmpty' and `nextTime` that check both of them
272 // do not transiently report that both delayed and queue are empty during move
273 delayed.removeFirstIf {
274 if (it.timeToExecute(now)) {
275 enqueueImpl(it)
276 } else
277 false
278 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
279 }
280 }
281 // then process one event from queue
dkhalanskyjb96b41b42020-03-30 18:54:09 +0300282 val task = dequeue()
283 if (task != null) {
Steve Elliottca095be2022-07-25 14:26:10 +0000284 platformAutoreleasePool { task.run() }
dkhalanskyjb96b41b42020-03-30 18:54:09 +0300285 return 0
286 }
Roman Elizarov98a97052019-07-02 15:30:47 +0300287 return nextTime
288 }
289
290 public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
291
Steve Elliottca095be2022-07-25 14:26:10 +0000292 open fun enqueue(task: Runnable) {
Roman Elizarov98a97052019-07-02 15:30:47 +0300293 if (enqueueImpl(task)) {
294 // todo: we should unpark only when this delayed task became first in the queue
295 unpark()
296 } else {
297 DefaultExecutor.enqueue(task)
298 }
299 }
300
301 @Suppress("UNCHECKED_CAST")
302 private fun enqueueImpl(task: Runnable): Boolean {
303 _queue.loop { queue ->
304 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
305 when (queue) {
306 null -> if (_queue.compareAndSet(null, task)) return true
307 is Queue<*> -> {
308 when ((queue as Queue<Runnable>).addLast(task)) {
309 Queue.ADD_SUCCESS -> return true
310 Queue.ADD_CLOSED -> return false
311 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
312 }
313 }
314 else -> when {
315 queue === CLOSED_EMPTY -> return false
316 else -> {
317 // update to full-blown queue to add one more
318 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
319 newQueue.addLast(queue as Runnable)
320 newQueue.addLast(task)
321 if (_queue.compareAndSet(queue, newQueue)) return true
322 }
323 }
324 }
325 }
326 }
327
328 @Suppress("UNCHECKED_CAST")
329 private fun dequeue(): Runnable? {
330 _queue.loop { queue ->
331 when (queue) {
332 null -> return null
333 is Queue<*> -> {
334 val result = (queue as Queue<Runnable>).removeFirstOrNull()
335 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
336 _queue.compareAndSet(queue, queue.next())
337 }
338 else -> when {
339 queue === CLOSED_EMPTY -> return null
340 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
341 }
342 }
343 }
344 }
345
346 private fun closeQueue() {
347 assert { isCompleted }
348 _queue.loop { queue ->
349 when (queue) {
350 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
351 is Queue<*> -> {
352 queue.close()
353 return
354 }
355 else -> when {
356 queue === CLOSED_EMPTY -> return
357 else -> {
358 // update to full-blown queue to close
359 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
360 newQueue.addLast(queue as Runnable)
361 if (_queue.compareAndSet(queue, newQueue)) return
362 }
363 }
364 }
365 }
366
367 }
368
Roman Elizarov48093932019-07-06 13:13:32 +0300369 public fun schedule(now: Long, delayedTask: DelayedTask) {
370 when (scheduleImpl(now, delayedTask)) {
Roman Elizarov98a97052019-07-02 15:30:47 +0300371 SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
Roman Elizarov48093932019-07-06 13:13:32 +0300372 SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
Roman Elizarov98a97052019-07-02 15:30:47 +0300373 SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
374 else -> error("unexpected result")
375 }
376 }
377
378 private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
379
Roman Elizarov48093932019-07-06 13:13:32 +0300380 private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
Roman Elizarov98a97052019-07-02 15:30:47 +0300381 if (isCompleted) return SCHEDULE_COMPLETED
Roman Elizarov48093932019-07-06 13:13:32 +0300382 val delayedQueue = _delayed.value ?: run {
383 _delayed.compareAndSet(null, DelayedTaskQueue(now))
Roman Elizarov98a97052019-07-02 15:30:47 +0300384 _delayed.value!!
385 }
Roman Elizarov48093932019-07-06 13:13:32 +0300386 return delayedTask.scheduleTask(now, delayedQueue, this)
Roman Elizarov98a97052019-07-02 15:30:47 +0300387 }
388
389 // It performs "hard" shutdown for test cleanup purposes
390 protected fun resetAll() {
391 _queue.value = null
392 _delayed.value = null
393 }
394
395 // This is a "soft" (normal) shutdown
396 private fun rescheduleAllDelayed() {
Roman Elizarov48093932019-07-06 13:13:32 +0300397 val now = nanoTime()
Roman Elizarov98a97052019-07-02 15:30:47 +0300398 while (true) {
399 /*
400 * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
401 * synchronized on DelayedTask itself. All other operation are synchronized both on
402 * DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
403 * first removes DelayedTask from the heap (under synchronization) then
404 * assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
405 */
406 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
Roman Elizarov48093932019-07-06 13:13:32 +0300407 reschedule(now, delayedTask)
Roman Elizarov98a97052019-07-02 15:30:47 +0300408 }
409 }
410
411 internal abstract class DelayedTask(
Roman Elizarov48093932019-07-06 13:13:32 +0300412 /**
413 * This field can be only modified in [scheduleTask] before putting this DelayedTask
414 * into heap to avoid overflow and corruption of heap data structure.
415 */
416 @JvmField var nanoTime: Long
Roman Elizarov98a97052019-07-02 15:30:47 +0300417 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
Steve Elliottca095be2022-07-25 14:26:10 +0000418 @Volatile
Roman Elizarov98a97052019-07-02 15:30:47 +0300419 private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
420
421 override var heap: ThreadSafeHeap<*>?
422 get() = _heap as? ThreadSafeHeap<*>
423 set(value) {
424 require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
425 _heap = value
426 }
427
428 override var index: Int = -1
429
Roman Elizarov98a97052019-07-02 15:30:47 +0300430 override fun compareTo(other: DelayedTask): Int {
431 val dTime = nanoTime - other.nanoTime
432 return when {
433 dTime > 0 -> 1
434 dTime < 0 -> -1
435 else -> 0
436 }
437 }
438
439 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
440
441 @Synchronized
Roman Elizarov48093932019-07-06 13:13:32 +0300442 fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
Roman Elizarov98a97052019-07-02 15:30:47 +0300443 if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
Roman Elizarov48093932019-07-06 13:13:32 +0300444 delayed.addLastIf(this) { firstTask ->
445 if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
446 /**
447 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
448 * invariant is maintained. The code in this lambda is additionally executed under
449 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
450 */
451 if (firstTask == null) {
452 /**
453 * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
454 * the current now time even if that means "going backwards in time". This makes the structure
455 * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
456 * are removed from the delayed queue for execution.
457 */
458 delayed.timeNow = now
459 } else {
460 /**
461 * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
462 * and only goes forward in time. We cannot let it go backwards in time or invariant can be
463 * violated for tasks that were already scheduled.
464 */
465 val firstTime = firstTask.nanoTime
466 // compute min(now, firstTime) using a wrap-safe check
467 val minTime = if (firstTime - now >= 0) now else firstTime
468 // update timeNow only when going forward in time
469 if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
470 }
471 /**
472 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
473 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
474 * function can be called to reschedule from one queue to another and this might be another reason
475 * where new task's time might now violate invariant.
476 * We correct invariant violation (if any) by simply changing this task's time to now.
477 */
478 if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
479 true
480 }
481 return SCHEDULE_OK
Roman Elizarov98a97052019-07-02 15:30:47 +0300482 }
483
Roman Elizarov98a97052019-07-02 15:30:47 +0300484 @Synchronized
485 final override fun dispose() {
486 val heap = _heap
487 if (heap === DISPOSED_TASK) return // already disposed
488 @Suppress("UNCHECKED_CAST")
Roman Elizarov48093932019-07-06 13:13:32 +0300489 (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
Roman Elizarov98a97052019-07-02 15:30:47 +0300490 _heap = DISPOSED_TASK // never add again to any heap
491 }
492
493 override fun toString(): String = "Delayed[nanos=$nanoTime]"
494 }
495
496 private inner class DelayedResumeTask(
Roman Elizarov48093932019-07-06 13:13:32 +0300497 nanoTime: Long,
Roman Elizarov98a97052019-07-02 15:30:47 +0300498 private val cont: CancellableContinuation<Unit>
Roman Elizarov48093932019-07-06 13:13:32 +0300499 ) : DelayedTask(nanoTime) {
500 override fun run() { with(cont) { resumeUndispatched(Unit) } }
501 override fun toString(): String = super.toString() + cont.toString()
Roman Elizarov98a97052019-07-02 15:30:47 +0300502 }
503
Roman Elizarov48093932019-07-06 13:13:32 +0300504 private class DelayedRunnableTask(
505 nanoTime: Long,
Roman Elizarov98a97052019-07-02 15:30:47 +0300506 private val block: Runnable
Roman Elizarov48093932019-07-06 13:13:32 +0300507 ) : DelayedTask(nanoTime) {
Roman Elizarov98a97052019-07-02 15:30:47 +0300508 override fun run() { block.run() }
509 override fun toString(): String = super.toString() + block.toString()
510 }
Roman Elizarov48093932019-07-06 13:13:32 +0300511
512 /**
513 * Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
514 * long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
515 * heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
516 * The invariant is that for every scheduled [DelayedTask]:
517 *
518 * ```
519 * delayedTask.nanoTime - timeNow >= 0
520 * ```
521 *
522 * So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
523 * scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
524 * new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
525 * (so there is nothing special to do there).
526 */
527 internal class DelayedTaskQueue(
528 @JvmField var timeNow: Long
529 ) : ThreadSafeHeap<DelayedTask>()
Roman Elizarov98a97052019-07-02 15:30:47 +0300530}
531
Roman Elizarov51738242018-12-21 16:41:39 +0300532internal expect fun createEventLoop(): EventLoop
533
Roman Elizarov98a97052019-07-02 15:30:47 +0300534internal expect fun nanoTime(): Long
535
536internal expect object DefaultExecutor {
537 public fun enqueue(task: Runnable)
Roman Elizarov98a97052019-07-02 15:30:47 +0300538}
539
Steve Elliottca095be2022-07-25 14:26:10 +0000540/**
541 * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
542 * non-Darwin native targets.
543 *
544 * Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
545 * the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
546 * be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
547 * pool management, it must manage the pool creation and pool drainage manually.
548 */
549internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)