blob: 64a9a886cebd50ec2855880ac1e192a94720db82 [file] [log] [blame]
Roman Elizarov92b04852017-07-11 15:11:58 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov92b04852017-07-11 15:11:58 +03003 */
4
5package kotlinx.coroutines.experimental
6
Roman Elizarovaa461cf2018-04-11 13:20:29 +03007import kotlinx.coroutines.experimental.timeunit.*
8
9internal actual val DefaultDelay: Delay = DefaultExecutor
Roman Elizarov92b04852017-07-11 15:11:58 +030010
Roman Elizarov35d2c342017-07-20 14:54:39 +030011@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
12internal object DefaultExecutor : EventLoopBase(), Runnable {
Roman Elizarov92b04852017-07-11 15:11:58 +030013
Roman Elizarov35d2c342017-07-20 14:54:39 +030014 override val isCompleted: Boolean get() = false
Roman Elizarov92b04852017-07-11 15:11:58 +030015
Roman Elizarov35d2c342017-07-20 14:54:39 +030016 private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
Roman Elizarov92b04852017-07-11 15:11:58 +030017
Roman Elizarov35d2c342017-07-20 14:54:39 +030018 private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
Roman Elizarov92b04852017-07-11 15:11:58 +030019 try {
Roman Elizarov35d2c342017-07-20 14:54:39 +030020 java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
21 } catch (e: SecurityException) {
22 DEFAULT_KEEP_ALIVE
23 })
24
25 @Volatile
26 private var _thread: Thread? = null
27
28 private const val FRESH = 0
29 private const val ACTIVE = 1
30 private const val SHUTDOWN_REQ = 2
31 private const val SHUTDOWN_ACK = 3
32
33 @Volatile
34 private var debugStatus: Int = FRESH
35
Roman Elizarov94833012018-01-12 18:28:24 +030036 private val isShutdownRequested: Boolean get() {
37 val debugStatus = debugStatus
38 return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
39 }
40
Roman Elizarov35d2c342017-07-20 14:54:39 +030041 override fun run() {
Roman Elizarov35d2c342017-07-20 14:54:39 +030042 timeSource.registerTimeLoopThread()
Roman Elizarov35d2c342017-07-20 14:54:39 +030043 try {
Roman Elizarove8986b82018-01-11 18:06:30 +030044 var shutdownNanos = Long.MAX_VALUE
Roman Elizarov94833012018-01-12 18:28:24 +030045 if (!notifyStartup()) return
46 while (true) {
47 Thread.interrupted() // just reset interruption flag
48 var parkNanos = processNextEvent()
49 if (parkNanos == Long.MAX_VALUE) {
50 // nothing to do, initialize shutdown timeout
51 if (shutdownNanos == Long.MAX_VALUE) {
52 val now = timeSource.nanoTime()
53 if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
54 val tillShutdown = shutdownNanos - now
55 if (tillShutdown <= 0) return // shut thread down
56 parkNanos = parkNanos.coerceAtMost(tillShutdown)
57 } else
58 parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
59 }
60 if (parkNanos > 0) {
61 // check if shutdown was requested and bail out in this case
62 if (isShutdownRequested) return
63 timeSource.parkNanos(this, parkNanos)
Roman Elizarov35d2c342017-07-20 14:54:39 +030064 }
65 }
66 } finally {
67 _thread = null // this thread is dead
Roman Elizarov94833012018-01-12 18:28:24 +030068 acknowledgeShutdownIfNeeded()
Roman Elizarov35d2c342017-07-20 14:54:39 +030069 timeSource.unregisterTimeLoopThread()
70 // recheck if queues are empty after _thread reference was set to null (!!!)
71 if (!isEmpty) thread() // recreate thread if it is needed
72 }
Roman Elizarov92b04852017-07-11 15:11:58 +030073 }
74
Roman Elizarov35d2c342017-07-20 14:54:39 +030075 // ensure that thread is there
76 private fun thread(): Thread = _thread ?: createThreadSync()
77
78 @Synchronized
79 private fun createThreadSync() = _thread ?:
80 Thread(this, "kotlinx.coroutines.DefaultExecutor").apply {
81 _thread = this
82 isDaemon = true
83 start()
84 }
85
86 override fun unpark() {
87 timeSource.unpark(thread()) // as a side effect creates thread if it is not there
88 }
89
90 override fun isCorrectThread(): Boolean = true
91
92 // used for tests
93 @Synchronized
94 internal fun ensureStarted() {
95 assert(_thread == null) // ensure we are at a clean state
Roman Elizarove8986b82018-01-11 18:06:30 +030096 assert(debugStatus == FRESH || debugStatus == SHUTDOWN_ACK)
Roman Elizarov35d2c342017-07-20 14:54:39 +030097 debugStatus = FRESH
98 createThreadSync() // create fresh thread
99 while (debugStatus == FRESH) (this as Object).wait()
100 }
101
102 @Synchronized
Roman Elizarove8986b82018-01-11 18:06:30 +0300103 private fun notifyStartup(): Boolean {
Roman Elizarov94833012018-01-12 18:28:24 +0300104 if (isShutdownRequested) return false
Roman Elizarov35d2c342017-07-20 14:54:39 +0300105 debugStatus = ACTIVE
106 (this as Object).notifyAll()
Roman Elizarove8986b82018-01-11 18:06:30 +0300107 return true
Roman Elizarov35d2c342017-07-20 14:54:39 +0300108 }
109
110 // used for tests
111 @Synchronized
Roman Elizarov94833012018-01-12 18:28:24 +0300112 fun shutdown(timeout: Long) {
113 val deadline = System.currentTimeMillis() + timeout
114 if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
115 // loop while there is anything to do immediately or deadline passes
116 while (debugStatus != SHUTDOWN_ACK && _thread != null) {
117 _thread?.let { timeSource.unpark(it) } // wake up thread if present
118 val remaining = deadline - System.currentTimeMillis()
119 if (remaining <= 0) break
120 (this as Object).wait(timeout)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300121 }
122 // restore fresh status
123 debugStatus = FRESH
124 }
125
126 @Synchronized
Roman Elizarov94833012018-01-12 18:28:24 +0300127 private fun acknowledgeShutdownIfNeeded() {
128 if (!isShutdownRequested) return
Roman Elizarov35d2c342017-07-20 14:54:39 +0300129 debugStatus = SHUTDOWN_ACK
Roman Elizarov5d94a262017-12-28 00:23:39 +0300130 resetAll() // clear queues
Roman Elizarov35d2c342017-07-20 14:54:39 +0300131 (this as Object).notifyAll()
Roman Elizarov92b04852017-07-11 15:11:58 +0300132 }
133}