blob: ab2b0bd76dc8ba779512bfbc4013104b19e55b87 [file] [log] [blame]
Roman Elizarov92b04852017-07-11 15:11:58 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov92b04852017-07-11 15:11:58 +03003 */
4
5package kotlinx.coroutines.experimental
6
Roman Elizarovaa461cf2018-04-11 13:20:29 +03007import kotlinx.coroutines.experimental.timeunit.*
8
9internal actual val DefaultDelay: Delay = DefaultExecutor
Roman Elizarov92b04852017-07-11 15:11:58 +030010
Roman Elizarov35d2c342017-07-20 14:54:39 +030011@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
12internal object DefaultExecutor : EventLoopBase(), Runnable {
Roman Elizarov92b04852017-07-11 15:11:58 +030013
Roman Elizarov35d2c342017-07-20 14:54:39 +030014 override val isCompleted: Boolean get() = false
Roman Elizarov92b04852017-07-11 15:11:58 +030015
Roman Elizarov35d2c342017-07-20 14:54:39 +030016 private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
Roman Elizarov92b04852017-07-11 15:11:58 +030017
Roman Elizarov35d2c342017-07-20 14:54:39 +030018 private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
Roman Elizarov92b04852017-07-11 15:11:58 +030019 try {
Roman Elizarov35d2c342017-07-20 14:54:39 +030020 java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
21 } catch (e: SecurityException) {
22 DEFAULT_KEEP_ALIVE
23 })
24
25 @Volatile
26 private var _thread: Thread? = null
27
28 private const val FRESH = 0
29 private const val ACTIVE = 1
30 private const val SHUTDOWN_REQ = 2
31 private const val SHUTDOWN_ACK = 3
32
33 @Volatile
34 private var debugStatus: Int = FRESH
35
Roman Elizarov94833012018-01-12 18:28:24 +030036 private val isShutdownRequested: Boolean get() {
37 val debugStatus = debugStatus
38 return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
39 }
40
Vsevolod Tolstopyatove90cdb02018-08-08 18:04:59 +030041 /**
42 * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
43 * ```
44 * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
45 * ```
46 *
47 * Livelock is possible only if runBlocking is called on [DefaultDispatcher], but it's not exposed as public API
48 */
49 override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
50 DelayedRunnableTask(time, unit, block).also { schedule(it) }
51
Roman Elizarov35d2c342017-07-20 14:54:39 +030052 override fun run() {
Roman Elizarov35d2c342017-07-20 14:54:39 +030053 timeSource.registerTimeLoopThread()
Roman Elizarov35d2c342017-07-20 14:54:39 +030054 try {
Roman Elizarove8986b82018-01-11 18:06:30 +030055 var shutdownNanos = Long.MAX_VALUE
Roman Elizarov94833012018-01-12 18:28:24 +030056 if (!notifyStartup()) return
57 while (true) {
58 Thread.interrupted() // just reset interruption flag
59 var parkNanos = processNextEvent()
60 if (parkNanos == Long.MAX_VALUE) {
61 // nothing to do, initialize shutdown timeout
62 if (shutdownNanos == Long.MAX_VALUE) {
63 val now = timeSource.nanoTime()
64 if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
65 val tillShutdown = shutdownNanos - now
66 if (tillShutdown <= 0) return // shut thread down
67 parkNanos = parkNanos.coerceAtMost(tillShutdown)
68 } else
69 parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
70 }
71 if (parkNanos > 0) {
72 // check if shutdown was requested and bail out in this case
73 if (isShutdownRequested) return
74 timeSource.parkNanos(this, parkNanos)
Roman Elizarov35d2c342017-07-20 14:54:39 +030075 }
76 }
77 } finally {
78 _thread = null // this thread is dead
Roman Elizarov94833012018-01-12 18:28:24 +030079 acknowledgeShutdownIfNeeded()
Roman Elizarov35d2c342017-07-20 14:54:39 +030080 timeSource.unregisterTimeLoopThread()
81 // recheck if queues are empty after _thread reference was set to null (!!!)
82 if (!isEmpty) thread() // recreate thread if it is needed
83 }
Roman Elizarov92b04852017-07-11 15:11:58 +030084 }
85
Roman Elizarov35d2c342017-07-20 14:54:39 +030086 // ensure that thread is there
87 private fun thread(): Thread = _thread ?: createThreadSync()
88
89 @Synchronized
90 private fun createThreadSync() = _thread ?:
91 Thread(this, "kotlinx.coroutines.DefaultExecutor").apply {
92 _thread = this
93 isDaemon = true
94 start()
95 }
96
97 override fun unpark() {
98 timeSource.unpark(thread()) // as a side effect creates thread if it is not there
99 }
100
101 override fun isCorrectThread(): Boolean = true
102
103 // used for tests
104 @Synchronized
105 internal fun ensureStarted() {
106 assert(_thread == null) // ensure we are at a clean state
Roman Elizarove8986b82018-01-11 18:06:30 +0300107 assert(debugStatus == FRESH || debugStatus == SHUTDOWN_ACK)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300108 debugStatus = FRESH
109 createThreadSync() // create fresh thread
110 while (debugStatus == FRESH) (this as Object).wait()
111 }
112
113 @Synchronized
Roman Elizarove8986b82018-01-11 18:06:30 +0300114 private fun notifyStartup(): Boolean {
Roman Elizarov94833012018-01-12 18:28:24 +0300115 if (isShutdownRequested) return false
Roman Elizarov35d2c342017-07-20 14:54:39 +0300116 debugStatus = ACTIVE
117 (this as Object).notifyAll()
Roman Elizarove8986b82018-01-11 18:06:30 +0300118 return true
Roman Elizarov35d2c342017-07-20 14:54:39 +0300119 }
120
121 // used for tests
122 @Synchronized
Roman Elizarov94833012018-01-12 18:28:24 +0300123 fun shutdown(timeout: Long) {
124 val deadline = System.currentTimeMillis() + timeout
125 if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
126 // loop while there is anything to do immediately or deadline passes
127 while (debugStatus != SHUTDOWN_ACK && _thread != null) {
128 _thread?.let { timeSource.unpark(it) } // wake up thread if present
129 val remaining = deadline - System.currentTimeMillis()
130 if (remaining <= 0) break
131 (this as Object).wait(timeout)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300132 }
133 // restore fresh status
134 debugStatus = FRESH
135 }
136
137 @Synchronized
Roman Elizarov94833012018-01-12 18:28:24 +0300138 private fun acknowledgeShutdownIfNeeded() {
139 if (!isShutdownRequested) return
Roman Elizarov35d2c342017-07-20 14:54:39 +0300140 debugStatus = SHUTDOWN_ACK
Roman Elizarov5d94a262017-12-28 00:23:39 +0300141 resetAll() // clear queues
Roman Elizarov35d2c342017-07-20 14:54:39 +0300142 (this as Object).notifyAll()
Roman Elizarov92b04852017-07-11 15:11:58 +0300143 }
144}