blob: 5061d532abf8bf7554808e308adf1bcfacf1ef00 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov53a0a402017-01-19 20:21:57 +030017package kotlinx.coroutines.experimental
18
Roman Elizarov5d94a262017-12-28 00:23:39 +030019import kotlinx.atomicfu.*
20import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +030021import kotlinx.coroutines.experimental.timeunit.*
Roman Elizarov5d94a262017-12-28 00:23:39 +030022import java.util.concurrent.locks.*
23import kotlin.coroutines.experimental.*
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +030024import kotlin.jvm.*
Roman Elizarov53a0a402017-01-19 20:21:57 +030025
Roman Elizarovd528e3e2017-01-23 15:40:05 +030026/**
27 * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
Roman Elizarov4518e052017-03-02 22:48:27 +030028 * be asked to process next event from their event queue.
29 *
30 * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
Roman Elizarovd528e3e2017-01-23 15:40:05 +030031 * continue processing events when invoked from the event dispatch thread.
32 */
Roman Elizarovbddb1d72017-12-25 17:16:08 +030033public interface EventLoop {
Roman Elizarovd528e3e2017-01-23 15:40:05 +030034 /**
Roman Elizarov4518e052017-03-02 22:48:27 +030035 * Processes next event in this event loop.
36 *
37 * The result of this function is to be interpreted like this:
38 * * `<= 0` -- there are potentially more events for immediate processing;
39 * * `> 0` -- a number of nanoseconds to wait for next scheduled event;
40 * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
Roman Elizarovd528e3e2017-01-23 15:40:05 +030041 */
Roman Elizarov4518e052017-03-02 22:48:27 +030042 public fun processNextEvent(): Long
Roman Elizarovd528e3e2017-01-23 15:40:05 +030043
Roman Elizarove2ceda82017-07-21 18:19:47 +030044 /** @suppress **Deprecated **/
45 @Deprecated(message = "Companion object to be removed, no replacement")
Roman Elizarovd528e3e2017-01-23 15:40:05 +030046 public companion object Factory {
Roman Elizarove2ceda82017-07-21 18:19:47 +030047 /** @suppress **Deprecated **/
48 @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
Roman Elizarovd528e3e2017-01-23 15:40:05 +030049 public operator fun invoke(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
50 EventLoopImpl(thread).apply {
51 if (parentJob != null) initParentJob(parentJob)
52 }
53 }
Roman Elizarov53a0a402017-01-19 20:21:57 +030054}
55
Roman Elizarove2ceda82017-07-21 18:19:47 +030056/**
57 * Creates a new event loop that is bound the specified [thread] (current thread by default) and
58 * stops accepting new events when [parentJob] completes. Every continuation that is scheduled
59 * onto this event loop unparks the specified thread via [LockSupport.unpark].
60 *
61 * The main event-processing loop using the resulting `eventLoop` object should look like this:
62 * ```
63 * while (needsToBeRunning) {
64 * if (Thread.interrupted()) break // or handle somehow
65 * LockSupport.parkNanos(eventLoop.processNextEvent()) // event loop will unpark
66 * }
67 * ```
68 */
Roman Elizarov5d94a262017-12-28 00:23:39 +030069@Suppress("FunctionName")
Roman Elizarove2ceda82017-07-21 18:19:47 +030070public fun EventLoop(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
71 EventLoopImpl(thread).apply {
72 if (parentJob != null) initParentJob(parentJob)
73 }
74
Roman Elizarov35d2c342017-07-20 14:54:39 +030075private const val DELAYED = 0
76private const val REMOVED = 1
77private const val RESCHEDULED = 2
78
Roman Elizarov5d94a262017-12-28 00:23:39 +030079@Suppress("PrivatePropertyName")
80private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
Roman Elizarov53a0a402017-01-19 20:21:57 +030081
Roman Elizarov5d94a262017-12-28 00:23:39 +030082private typealias Queue<T> = LockFreeMPSCQueueCore<T>
83
84internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
85 // null | CLOSED_EMPTY | task | Queue<Runnable>
86 private val _queue = atomic<Any?>(null)
87
88 // Allocated only only once
89 private val _delayed = atomic<ThreadSafeHeap<DelayedTask>?>(null)
90
Roman Elizarov35d2c342017-07-20 14:54:39 +030091 protected abstract val isCompleted: Boolean
92 protected abstract fun unpark()
93 protected abstract fun isCorrectThread(): Boolean
Roman Elizarov53a0a402017-01-19 20:21:57 +030094
Roman Elizarov35d2c342017-07-20 14:54:39 +030095 protected val isEmpty: Boolean
Roman Elizarov5d94a262017-12-28 00:23:39 +030096 get() = isQueueEmpty && isDelayedEmpty
97
98 private val isQueueEmpty: Boolean get() {
99 val queue = _queue.value
100 return when (queue) {
101 null -> true
102 is Queue<*> -> queue.isEmpty
103 else -> queue === CLOSED_EMPTY
104 }
105 }
106
107 private val isDelayedEmpty: Boolean get() {
108 val delayed = _delayed.value
109 return delayed == null || delayed.isEmpty
110 }
Roman Elizarov35d2c342017-07-20 14:54:39 +0300111
112 private val nextTime: Long
113 get() {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300114 if (!isQueueEmpty) return 0
115 val delayed = _delayed.value ?: return Long.MAX_VALUE
Roman Elizarov35d2c342017-07-20 14:54:39 +0300116 val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE
117 return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0)
Roman Elizarov53a0a402017-01-19 20:21:57 +0300118 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300119
Roman Elizarov35d2c342017-07-20 14:54:39 +0300120 override fun dispatch(context: CoroutineContext, block: Runnable) =
Roman Elizarov5d94a262017-12-28 00:23:39 +0300121 execute(block)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300122
123 override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
124 schedule(DelayedResumeTask(time, unit, continuation))
125
126 override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
127 DelayedRunnableTask(time, unit, block).also { schedule(it) }
Roman Elizarov4518e052017-03-02 22:48:27 +0300128
129 override fun processNextEvent(): Long {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300130 if (!isCorrectThread()) return Long.MAX_VALUE
Roman Elizarov4518e052017-03-02 22:48:27 +0300131 // queue all delayed tasks that are due to be executed
Roman Elizarov5d94a262017-12-28 00:23:39 +0300132 val delayed = _delayed.value
133 if (delayed != null && !delayed.isEmpty) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300134 val now = timeSource.nanoTime()
Roman Elizarova047a112017-07-10 18:50:07 +0300135 while (true) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300136 // make sure that moving from delayed to queue removes from delayed only after it is added to queue
137 // to make sure that 'isEmpty' and `nextTime` that check both of them
138 // do not transiently report that both delayed and queue are empty during move
139 delayed.removeFirstIf {
140 if (it.timeToExecute(now)) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300141 enqueueImpl(it)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300142 } else
143 false
Roman Elizarov5d94a262017-12-28 00:23:39 +0300144 } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
Roman Elizarova047a112017-07-10 18:50:07 +0300145 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300146 }
147 // then process one event from queue
Roman Elizarov5d94a262017-12-28 00:23:39 +0300148 dequeue()?.run()
Roman Elizarov35d2c342017-07-20 14:54:39 +0300149 return nextTime
Roman Elizarov4518e052017-03-02 22:48:27 +0300150 }
151
Roman Elizarov5d94a262017-12-28 00:23:39 +0300152 @Suppress("MemberVisibilityCanBePrivate") // todo: remove suppress when KT-22030 is fixed
153 internal fun execute(task: Runnable) {
154 if (enqueueImpl(task)) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300155 // todo: we should unpark only when this delayed task became first in the queue
156 unpark()
157 } else
Roman Elizarov5d94a262017-12-28 00:23:39 +0300158 DefaultExecutor.execute(task)
Roman Elizarov4518e052017-03-02 22:48:27 +0300159 }
160
Roman Elizarov5d94a262017-12-28 00:23:39 +0300161 @Suppress("UNCHECKED_CAST")
162 private fun enqueueImpl(task: Runnable): Boolean {
163 _queue.loop { queue ->
164 if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
165 when (queue) {
166 null -> if (_queue.compareAndSet(null, task)) return true
167 is Queue<*> -> {
168 when ((queue as Queue<Runnable>).addLast(task)) {
169 Queue.ADD_SUCCESS -> return true
170 Queue.ADD_CLOSED -> return false
171 Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
172 }
173 }
174 else -> when {
175 queue === CLOSED_EMPTY -> return false
176 else -> {
177 // update to full-blown queue to add one more
178 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
179 newQueue.addLast(queue as Runnable)
180 newQueue.addLast(task)
181 if (_queue.compareAndSet(queue, newQueue)) return true
182 }
183 }
184 }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300185 }
Roman Elizarov5d94a262017-12-28 00:23:39 +0300186 }
187
188 @Suppress("UNCHECKED_CAST")
189 private fun dequeue(): Runnable? {
190 _queue.loop { queue ->
191 when (queue) {
192 null -> return null
193 is Queue<*> -> {
194 val result = (queue as Queue<Runnable>).removeFirstOrNull()
195 if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
196 _queue.compareAndSet(queue, queue.next())
197 }
198 else -> when {
199 queue === CLOSED_EMPTY -> return null
200 else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
201 }
202 }
203 }
204 }
205
206 protected fun closeQueue() {
207 assert(isCompleted)
208 _queue.loop { queue ->
209 when (queue) {
210 null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
211 is Queue<*> -> {
212 queue.close()
213 return
214 }
215 else -> when {
216 queue === CLOSED_EMPTY -> return
217 else -> {
218 // update to full-blown queue to close
219 val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY)
220 newQueue.addLast(queue as Runnable)
221 if (_queue.compareAndSet(queue, newQueue)) return
222 }
223 }
224 }
225 }
226
Roman Elizarov4518e052017-03-02 22:48:27 +0300227 }
228
Roman Elizarov35d2c342017-07-20 14:54:39 +0300229 internal fun schedule(delayedTask: DelayedTask) {
230 if (scheduleImpl(delayedTask)) {
231 // todo: we should unpark only when this delayed task became first in the queue
232 unpark()
233 } else
234 DefaultExecutor.schedule(delayedTask)
235 }
236
237 private fun scheduleImpl(delayedTask: DelayedTask): Boolean {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300238 if (isCompleted) return false
239 val delayed = _delayed.value ?: run {
240 _delayed.compareAndSet(null, ThreadSafeHeap())
241 _delayed.value!!
Roman Elizarova047a112017-07-10 18:50:07 +0300242 }
Roman Elizarov35d2c342017-07-20 14:54:39 +0300243 return delayed.addLastIf(delayedTask) { !isCompleted }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300244 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300245
Roman Elizarov35d2c342017-07-20 14:54:39 +0300246 internal fun removeDelayedImpl(delayedTask: DelayedTask) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300247 _delayed.value?.remove(delayedTask)
Roman Elizarov4518e052017-03-02 22:48:27 +0300248 }
Roman Elizarovd528e3e2017-01-23 15:40:05 +0300249
Roman Elizarov5d94a262017-12-28 00:23:39 +0300250 // It performs "hard" shutdown for test cleanup purposes
251 protected fun resetAll() {
252 _queue.value = null
253 _delayed.value = null
Roman Elizarov35d2c342017-07-20 14:54:39 +0300254 }
255
Roman Elizarov5d94a262017-12-28 00:23:39 +0300256 // This is a "soft" (normal) shutdown
Roman Elizarov35d2c342017-07-20 14:54:39 +0300257 protected fun rescheduleAllDelayed() {
258 while (true) {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300259 val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
Roman Elizarov35d2c342017-07-20 14:54:39 +0300260 delayedTask.rescheduleOnShutdown()
261 }
262 }
263
Roman Elizarov35d2c342017-07-20 14:54:39 +0300264 internal abstract inner class DelayedTask(
Roman Elizarov4518e052017-03-02 22:48:27 +0300265 time: Long, timeUnit: TimeUnit
Roman Elizarov5d94a262017-12-28 00:23:39 +0300266 ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
Roman Elizarova047a112017-07-10 18:50:07 +0300267 override var index: Int = -1
Roman Elizarov5d94a262017-12-28 00:23:39 +0300268 var state = DELAYED // Guarded by by lock on this task for reschedule/dispose purposes
Roman Elizarov35d2c342017-07-20 14:54:39 +0300269 @JvmField val nanoTime: Long = timeSource.nanoTime() + timeUnit.toNanos(time)
Roman Elizarov4518e052017-03-02 22:48:27 +0300270
271 override fun compareTo(other: DelayedTask): Int {
272 val dTime = nanoTime - other.nanoTime
Roman Elizarov35d2c342017-07-20 14:54:39 +0300273 return when {
274 dTime > 0 -> 1
275 dTime < 0 -> -1
276 else -> 0
277 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300278 }
279
Roman Elizarova047a112017-07-10 18:50:07 +0300280 fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
281
Roman Elizarov5d94a262017-12-28 00:23:39 +0300282 fun rescheduleOnShutdown() = synchronized(this) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300283 if (state != DELAYED) return
Roman Elizarov5d94a262017-12-28 00:23:39 +0300284 if (_delayed.value!!.remove(this)) {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300285 state = RESCHEDULED
286 DefaultExecutor.schedule(this)
287 } else
288 state = REMOVED
Roman Elizarov4518e052017-03-02 22:48:27 +0300289 }
290
Roman Elizarov5d94a262017-12-28 00:23:39 +0300291 final override fun dispose() = synchronized(this) {
Roman Elizarov94833012018-01-12 18:28:24 +0300292 when (state) {
293 DELAYED -> _delayed.value?.remove(this)
294 RESCHEDULED -> DefaultExecutor.removeDelayedImpl(this)
295 else -> return
296 }
297 state = REMOVED
Roman Elizarova047a112017-07-10 18:50:07 +0300298 }
Roman Elizarov769d4822017-07-10 13:38:54 +0300299
Roman Elizarov35d2c342017-07-20 14:54:39 +0300300 override fun toString(): String = "Delayed[nanos=$nanoTime]"
Roman Elizarov4518e052017-03-02 22:48:27 +0300301 }
302
303 private inner class DelayedResumeTask(
304 time: Long, timeUnit: TimeUnit,
305 private val cont: CancellableContinuation<Unit>
306 ) : DelayedTask(time, timeUnit) {
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +0300307
308 init {
309 // Note that this operation isn't lock-free, but very short
Roman Elizarov6d9f40f2018-04-28 14:44:02 +0300310 cont.disposeOnCancellation(this)
Vsevolod Tolstopyatovcd006432018-04-26 16:03:40 +0300311 }
312
Roman Elizarova047a112017-07-10 18:50:07 +0300313 override fun run() {
Roman Elizarov4518e052017-03-02 22:48:27 +0300314 with(cont) { resumeUndispatched(Unit) }
315 }
Roman Elizarov4518e052017-03-02 22:48:27 +0300316 }
317
318 private inner class DelayedRunnableTask(
319 time: Long, timeUnit: TimeUnit,
320 private val block: Runnable
321 ) : DelayedTask(time, timeUnit) {
Roman Elizarova047a112017-07-10 18:50:07 +0300322 override fun run() { block.run() }
Roman Elizarov769d4822017-07-10 13:38:54 +0300323 override fun toString(): String = super.toString() + block.toString()
Roman Elizarov4518e052017-03-02 22:48:27 +0300324 }
Roman Elizarov53a0a402017-01-19 20:21:57 +0300325}
Roman Elizarov35d2c342017-07-20 14:54:39 +0300326
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300327internal abstract class ThreadEventLoop(
Roman Elizarov35d2c342017-07-20 14:54:39 +0300328 private val thread: Thread
329) : EventLoopBase() {
Roman Elizarov35d2c342017-07-20 14:54:39 +0300330 override fun isCorrectThread(): Boolean = Thread.currentThread() === thread
331
Roman Elizarov35d2c342017-07-20 14:54:39 +0300332 override fun unpark() {
333 if (Thread.currentThread() !== thread)
334 timeSource.unpark(thread)
335 }
336
337 fun shutdown() {
Roman Elizarov5d94a262017-12-28 00:23:39 +0300338 closeQueue()
Roman Elizarov35d2c342017-07-20 14:54:39 +0300339 assert(isCorrectThread())
340 // complete processing of all queued tasks
341 while (processNextEvent() <= 0) { /* spin */ }
342 // reschedule the rest of delayed tasks
343 rescheduleAllDelayed()
344 }
345}
346
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300347private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
348 private var parentJob: Job? = null
349
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300350 override val isCompleted: Boolean get() = parentJob?.isCompleted == true
351
352 fun initParentJob(parentJob: Job) {
353 require(this.parentJob == null)
354 this.parentJob = parentJob
355 }
356}
357
358internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300359 @Volatile
360 public override var isCompleted: Boolean = false
361}