blob: 66ea0b4a8cb0ecb288fea54830bab771bbfb48c8 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov53a0a402017-01-19 20:21:57 +03005package kotlinx.coroutines.experimental
6
Roman Elizarov5d94a262017-12-28 00:23:39 +03007import kotlinx.atomicfu.*
8import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +03009import kotlinx.coroutines.experimental.timeunit.*
Roman Elizarov5d94a262017-12-28 00:23:39 +030010import java.util.concurrent.locks.*
11import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +030012import kotlin.jvm.*
Roman Elizarov53a0a402017-01-19 20:21:57 +030013
Roman Elizarovd528e3e2017-01-23 15:40:05 +030014/**
15 * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
Roman Elizarov4518e052017-03-02 22:48:27 +030016 * be asked to process next event from their event queue.
17 *
18 * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
Roman Elizarovd528e3e2017-01-23 15:40:05 +030019 * continue processing events when invoked from the event dispatch thread.
20 */
Roman Elizarovbddb1d72017-12-25 17:16:08 +030021public interface EventLoop {
Roman Elizarovd528e3e2017-01-23 15:40:05 +030022 /**
Roman Elizarov4518e052017-03-02 22:48:27 +030023 * Processes next event in this event loop.
24 *
25 * The result of this function is to be interpreted like this:
26 * * `<= 0` -- there are potentially more events for immediate processing;
27 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
28 * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
Roman Elizarovd528e3e2017-01-23 15:40:05 +030029 */
Roman Elizarov4518e052017-03-02 22:48:27 +030030 public fun processNextEvent(): Long
Roman Elizarovd528e3e2017-01-23 15:40:05 +030031
Roman Elizarove2ceda82017-07-21 18:19:47 +030032 /** @suppress **Deprecated **/
33 @Deprecated(message = "Companion object to be removed, no replacement")
Roman Elizarovd528e3e2017-01-23 15:40:05 +030034 public companion object Factory {
Roman Elizarove2ceda82017-07-21 18:19:47 +030035 /** @suppress **Deprecated **/
36 @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
Roman Elizarovd528e3e2017-01-23 15:40:05 +030037 public operator fun invoke(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
38 EventLoopImpl(thread).apply {
39 if (parentJob != null) initParentJob(parentJob)
40 }
41 }
Roman Elizarov53a0a402017-01-19 20:21:57 +030042}
43
Roman Elizarove2ceda82017-07-21 18:19:47 +030044/**
45 * Creates a new event loop that is bound the specified [thread] (current thread by default) and
46 * stops accepting new events when [parentJob] completes. Every continuation that is scheduled
47 * onto this event loop unparks the specified thread via [LockSupport.unpark].
48 *
49 * The main event-processing loop using the resulting `eventLoop` object should look like this:
50 * ```
51 * while (needsToBeRunning) {
52 * if (Thread.interrupted()) break // or handle somehow
53 * LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
54 * }
55 * ```
56 */
Roman Elizarov5d94a262017-12-28 00:23:39 +030057@Suppress("FunctionName")
Roman Elizarove2ceda82017-07-21 18:19:47 +030058public fun EventLoop(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
59 EventLoopImpl(thread).apply {
60 if (parentJob != null) initParentJob(parentJob)
61 }
62
Roman Elizarov35d2c342017-07-20 14:54:39 +030063private const val DELAYED = 0
64private const val REMOVED = 1
65private const val RESCHEDULED = 2
66
Roman Elizarov5d94a262017-12-28 00:23:39 +030067@Suppress("PrivatePropertyName")
68private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
Roman Elizarov53a0a402017-01-19 20:21:57 +030069
Roman Elizarov5d94a262017-12-28 00:23:39 +030070private typealias Queue<T> = LockFreeMPSCQueueCore<T>
71
72internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
73 // null | CLOSED_EMPTY | task | Queue<Runnable>
74 private val _queue = atomic<Any?>(null)
75
76 // Allocated only only once
77 private val _delayed = atomic<ThreadSafeHeap<DelayedTask>?>(null)
78
Roman Elizarov35d2c342017-07-20 14:54:39 +030079 protected abstract val isCompleted: Boolean
80 protected abstract fun unpark()
81 protected abstract fun isCorrectThread(): Boolean
Roman Elizarov53a0a402017-01-19 20:21:57 +030082
Roman Elizarov35d2c342017-07-20 14:54:39 +030083 protected val isEmpty: Boolean
Roman Elizarov5d94a262017-12-28 00:23:39 +030084 get() = isQueueEmpty && isDelayedEmpty
85
86 private val isQueueEmpty: Boolean get() {
87 val queue = _queue.value
88 return when (queue) {
89 null -> true
90 is Queue<*> -> queue.isEmpty
91 else -> queue === CLOSED_EMPTY
92 }
93 }
94
95 private val isDelayedEmpty: Boolean get() {
96 val delayed = _delayed.value
97 return delayed == null || delayed.isEmpty
98 }
Roman Elizarov35d2c342017-07-20 14:54:39 +030099
100 private val nextTime: Long
101 get() {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300102 if (!isQueueEmpty) return 0
103 val delayed = _delayed.value ?: return Long.MAX_VALUE
Roman Elizarov35d2c342017-07-20 14:54:39 +0300104 val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE
105 return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0)
Roman Elizarov53a0a402017-01-19 20:21:57 +0300106 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300107
Roman Elizarov35d2c342017-07-20 14:54:39 +0300108 override fun dispatch(context: CoroutineContext, block: Runnable) =
Roman Elizarov5d94a262017-12-28 00:23:39 +0300109 execute(block)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300110
111 override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
112 schedule(DelayedResumeTask(time, unit, continuation))
113
114 override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
115 DelayedRunnableTask(time, unit, block).also { schedule(it) }
Roman Elizarov4518e052017-03-02 22:48:27 +0300116
117 override fun processNextEvent(): Long {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300118 if (!isCorrectThread()) return Long.MAX_VALUE
Roman Elizarov4518e052017-03-02 22:48:27 +0300119 // queue all delayed tasks that are due to be executed
Roman Elizarov5d94a262017-12-28 00:23:39 +0300120 val delayed = _delayed.value
121 if (delayed != null && !delayed.isEmpty) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300122 val now = timeSource.nanoTime()
Roman Elizarova047a112017-07-10 18:50:07 +0300123 while (true) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300124 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
125 // to make sure that 'isEmpty' and `nextTime` that check both of them
126 // do not transiently report that both delayed and queue are empty during move
127 delayed.removeFirstIf {
128 if (it.timeToExecute(now)) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300129 enqueueImpl(it)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300130 } else
131 false
Roman Elizarov5d94a262017-12-28 00:23:39 +0300132 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
Roman Elizarova047a112017-07-10 18:50:07 +0300133 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300134 }
135 // then process one event from queue
Roman Elizarov5d94a262017-12-28 00:23:39 +0300136 dequeue()?.run()
Roman Elizarov35d2c342017-07-20 14:54:39 +0300137 return nextTime
Roman Elizarov4518e052017-03-02 22:48:27 +0300138 }
139
Roman Elizarov5d94a262017-12-28 00:23:39 +0300140 @Suppress("MemberVisibilityCanBePrivate") // todo: remove suppress when KT-22030 is fixed
141 internal fun execute(task: Runnable) {
142 if (enqueueImpl(task)) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300143 // todo: we should unpark only when this delayed task became first in the queue
144 unpark()
145 } else
Roman Elizarov5d94a262017-12-28 00:23:39 +0300146 DefaultExecutor.execute(task)
Roman Elizarov4518e052017-03-02 22:48:27 +0300147 }
148
Roman Elizarov5d94a262017-12-28 00:23:39 +0300149 @Suppress("UNCHECKED_CAST")
150 private fun enqueueImpl(task: Runnable): Boolean {
151 _queue.loop { queue ->
152 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
153 when (queue) {
154 null -> if (_queue.compareAndSet(null, task)) return true
155 is Queue<*> -> {
156 when ((queue as Queue<Runnable>).addLast(task)) {
157 Queue.ADD_SUCCESS -> return true
158 Queue.ADD_CLOSED -> return false
159 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
160 }
161 }
162 else -> when {
163 queue === CLOSED_EMPTY -> return false
164 else -> {
165 // update to full-blown queue to add one more
166 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
167 newQueue.addLast(queue as Runnable)
168 newQueue.addLast(task)
169 if (_queue.compareAndSet(queue, newQueue)) return true
170 }
171 }
172 }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300173 }
Roman Elizarov5d94a262017-12-28 00:23:39 +0300174 }
175
176 @Suppress("UNCHECKED_CAST")
177 private fun dequeue(): Runnable? {
178 _queue.loop { queue ->
179 when (queue) {
180 null -> return null
181 is Queue<*> -> {
182 val result = (queue as Queue<Runnable>).removeFirstOrNull()
183 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
184 _queue.compareAndSet(queue, queue.next())
185 }
186 else -> when {
187 queue === CLOSED_EMPTY -> return null
188 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
189 }
190 }
191 }
192 }
193
194 protected fun closeQueue() {
195 assert(isCompleted)
196 _queue.loop { queue ->
197 when (queue) {
198 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
199 is Queue<*> -> {
200 queue.close()
201 return
202 }
203 else -> when {
204 queue === CLOSED_EMPTY -> return
205 else -> {
206 // update to full-blown queue to close
207 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
208 newQueue.addLast(queue as Runnable)
209 if (_queue.compareAndSet(queue, newQueue)) return
210 }
211 }
212 }
213 }
214
Roman Elizarov4518e052017-03-02 22:48:27 +0300215 }
216
Roman Elizarov35d2c342017-07-20 14:54:39 +0300217 internal fun schedule(delayedTask: DelayedTask) {
218 if (scheduleImpl(delayedTask)) {
219 // todo: we should unpark only when this delayed task became first in the queue
220 unpark()
221 } else
222 DefaultExecutor.schedule(delayedTask)
223 }
224
225 private fun scheduleImpl(delayedTask: DelayedTask): Boolean {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300226 if (isCompleted) return false
227 val delayed = _delayed.value ?: run {
228 _delayed.compareAndSet(null, ThreadSafeHeap())
229 _delayed.value!!
Roman Elizarova047a112017-07-10 18:50:07 +0300230 }
Roman Elizarov35d2c342017-07-20 14:54:39 +0300231 return delayed.addLastIf(delayedTask) { !isCompleted }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300232 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300233
Roman Elizarov35d2c342017-07-20 14:54:39 +0300234 internal fun removeDelayedImpl(delayedTask: DelayedTask) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300235 _delayed.value?.remove(delayedTask)
Roman Elizarov4518e052017-03-02 22:48:27 +0300236 }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300237
Roman Elizarov5d94a262017-12-28 00:23:39 +0300238 // It performs "hard" shutdown for test cleanup purposes
239 protected fun resetAll() {
240 _queue.value = null
241 _delayed.value = null
Roman Elizarov35d2c342017-07-20 14:54:39 +0300242 }
243
Roman Elizarov5d94a262017-12-28 00:23:39 +0300244 // This is a "soft" (normal) shutdown
Roman Elizarov35d2c342017-07-20 14:54:39 +0300245 protected fun rescheduleAllDelayed() {
246 while (true) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300247 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
Roman Elizarov35d2c342017-07-20 14:54:39 +0300248 delayedTask.rescheduleOnShutdown()
249 }
250 }
251
Roman Elizarov35d2c342017-07-20 14:54:39 +0300252 internal abstract inner class DelayedTask(
Roman Elizarov4518e052017-03-02 22:48:27 +0300253 time: Long, timeUnit: TimeUnit
Roman Elizarov5d94a262017-12-28 00:23:39 +0300254 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
Roman Elizarova047a112017-07-10 18:50:07 +0300255 override var index: Int = -1
Roman Elizarov5d94a262017-12-28 00:23:39 +0300256 var state = DELAYED // Guarded by by lock on this task for reschedule/dispose purposes
Roman Elizarov35d2c342017-07-20 14:54:39 +0300257 @JvmField val nanoTime: Long = timeSource.nanoTime() + timeUnit.toNanos(time)
Roman Elizarov4518e052017-03-02 22:48:27 +0300258
259 override fun compareTo(other: DelayedTask): Int {
260 val dTime = nanoTime - other.nanoTime
Roman Elizarov35d2c342017-07-20 14:54:39 +0300261 return when {
262 dTime > 0 -> 1
263 dTime < 0 -> -1
264 else -> 0
265 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300266 }
267
Roman Elizarova047a112017-07-10 18:50:07 +0300268 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
269
Roman Elizarov5d94a262017-12-28 00:23:39 +0300270 fun rescheduleOnShutdown() = synchronized(this) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300271 if (state != DELAYED) return
Roman Elizarov5d94a262017-12-28 00:23:39 +0300272 if (_delayed.value!!.remove(this)) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300273 state = RESCHEDULED
274 DefaultExecutor.schedule(this)
275 } else
276 state = REMOVED
Roman Elizarov4518e052017-03-02 22:48:27 +0300277 }
278
Roman Elizarov5d94a262017-12-28 00:23:39 +0300279 final override fun dispose() = synchronized(this) {
Roman Elizarov94833012018-01-12 18:28:24 +0300280 when (state) {
281 DELAYED -> _delayed.value?.remove(this)
282 RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
283 else -> return
284 }
285 state = REMOVED
Roman Elizarova047a112017-07-10 18:50:07 +0300286 }
Roman Elizarov769d4822017-07-10 13:38:54 +0300287
Roman Elizarov35d2c342017-07-20 14:54:39 +0300288 override fun toString(): String = "Delayed[nanos=$nanoTime]"
Roman Elizarov4518e052017-03-02 22:48:27 +0300289 }
290
291 private inner class DelayedResumeTask(
292 time: Long, timeUnit: TimeUnit,
293 private val cont: CancellableContinuation<Unit>
294 ) : DelayedTask(time, timeUnit) {
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +0300295
296 init {
297 // Note that this operation isn't lock-free, but very short
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300298 cont.disposeOnCancellation(this)
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +0300299 }
300
Roman Elizarova047a112017-07-10 18:50:07 +0300301 override fun run() {
Roman Elizarov4518e052017-03-02 22:48:27 +0300302 with(cont) { resumeUndispatched(Unit) }
303 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300304 }
305
306 private inner class DelayedRunnableTask(
307 time: Long, timeUnit: TimeUnit,
308 private val block: Runnable
309 ) : DelayedTask(time, timeUnit) {
Roman Elizarova047a112017-07-10 18:50:07 +0300310 override fun run() { block.run() }
Roman Elizarov769d4822017-07-10 13:38:54 +0300311 override fun toString(): String = super.toString() + block.toString()
Roman Elizarov4518e052017-03-02 22:48:27 +0300312 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300313}
Roman Elizarov35d2c342017-07-20 14:54:39 +0300314
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300315internal abstract class ThreadEventLoop(
Roman Elizarov35d2c342017-07-20 14:54:39 +0300316 private val thread: Thread
317) : EventLoopBase() {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300318 override fun isCorrectThread(): Boolean = Thread.currentThread() === thread
319
Roman Elizarov35d2c342017-07-20 14:54:39 +0300320 override fun unpark() {
321 if (Thread.currentThread() !== thread)
322 timeSource.unpark(thread)
323 }
324
325 fun shutdown() {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300326 closeQueue()
Roman Elizarov35d2c342017-07-20 14:54:39 +0300327 assert(isCorrectThread())
328 // complete processing of all queued tasks
329 while (processNextEvent() <= 0) { /* spin */ }
330 // reschedule the rest of delayed tasks
331 rescheduleAllDelayed()
332 }
333}
334
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300335private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
336 private var parentJob: Job? = null
337
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300338 override val isCompleted: Boolean get() = parentJob?.isCompleted == true
339
340 fun initParentJob(parentJob: Job) {
341 require(this.parentJob == null)
342 this.parentJob = parentJob
343 }
344}
345
346internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300347 @Volatile
348 public override var isCompleted: Boolean = false
349}