blob: 450163c668c09cdace5875b870057d75db964d65 [file] [log] [blame]
Roman Elizarovf29203c2018-01-11 12:39:36 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf29203c2018-01-11 12:39:36 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines
Roman Elizarovf29203c2018-01-11 12:39:36 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007import kotlinx.coroutines.internal.*
8import kotlin.coroutines.*
Vsevolod Tolstopyatov36c3ba12018-10-14 23:45:28 +03009import kotlin.jvm.*
Roman Elizarovf29203c2018-01-11 12:39:36 +030010
11@Suppress("PrivatePropertyName")
Vsevolod Tolstopyatovd8cdc9f2018-11-13 13:30:16 +030012@SharedImmutable
Vsevolod Tolstopyatov09b9d6c2018-10-17 19:51:50 +030013private val UNDEFINED = Symbol("UNDEFINED")
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030014
Roman Elizarov51738242018-12-21 16:41:39 +030015/**
16 * Executes given [block] as part of current event loop, updating current continuation
17 * mode and state if continuation is not resumed immediately.
18 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
19 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
20 */
21private inline fun DispatchedContinuation<*>.executeUnconfined(
22 contState: Any?, mode: Int, doYield: Boolean = false,
23 block: () -> Unit
24) : Boolean {
25 val eventLoop = ThreadLocalEventLoop.eventLoop
26 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
27 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
28 return if (eventLoop.isUnconfinedLoopActive) {
29 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30 _state = contState
31 resumeMode = mode
32 eventLoop.dispatchUnconfined(this)
33 true // queued into the active loop
34 } else {
35 // Was not active -- run event loop until all unconfined tasks are executed
36 runUnconfinedEventLoop(eventLoop, block = block)
37 false
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030038 }
Roman Elizarov51738242018-12-21 16:41:39 +030039}
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030040
Roman Elizarov51738242018-12-21 16:41:39 +030041private fun DispatchedTask<*>.resumeUnconfined() {
42 val eventLoop = ThreadLocalEventLoop.eventLoop
43 if (eventLoop.isUnconfinedLoopActive) {
44 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45 eventLoop.dispatchUnconfined(this)
46 } else {
47 // Was not active -- run event loop until all unconfined tasks are executed
48 runUnconfinedEventLoop(eventLoop) {
49 resume(delegate, MODE_UNDISPATCHED)
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030050 }
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030051 }
Roman Elizarov51738242018-12-21 16:41:39 +030052}
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030053
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +030054private inline fun DispatchedTask<*>.runUnconfinedEventLoop(
Roman Elizarov51738242018-12-21 16:41:39 +030055 eventLoop: EventLoop,
56 block: () -> Unit
57) {
58 eventLoop.incrementUseCount(unconfined = true)
59 try {
60 block()
61 while (true) {
62 // break when all unconfined continuations where executed
63 if (!eventLoop.processUnconfinedEvent()) break
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030064 }
Roman Elizarov51738242018-12-21 16:41:39 +030065 } catch (e: Throwable) {
66 /*
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +030067 * This exception doesn't happen normally, only if we have a bug in implementation.
68 * Report it as a fatal exception.
Roman Elizarov51738242018-12-21 16:41:39 +030069 */
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +030070 handleFatalException(e, null)
Roman Elizarov51738242018-12-21 16:41:39 +030071 } finally {
72 eventLoop.decrementUseCount(unconfined = true)
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030073 }
74}
Roman Elizarovf29203c2018-01-11 12:39:36 +030075
76internal class DispatchedContinuation<in T>(
Roman Elizarovaa461cf2018-04-11 13:20:29 +030077 @JvmField val dispatcher: CoroutineDispatcher,
78 @JvmField val continuation: Continuation<T>
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +030079) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +030080 @JvmField
81 @Suppress("PropertyName")
82 internal var _state: Any? = UNDEFINED
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +030083 override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
84 override fun getStackTraceElement(): StackTraceElement? = null
Vsevolod Tolstopyatovae857972018-10-16 15:47:26 +030085 @JvmField // pre-cached value to avoid ctx.fold on every resumption
86 internal val countOrElement = threadContextElements(context)
Roman Elizarovf29203c2018-01-11 12:39:36 +030087
88 override fun takeState(): Any? {
89 val state = _state
90 check(state !== UNDEFINED) // fail-fast if repeatedly invoked
91 _state = UNDEFINED
92 return state
93 }
94
95 override val delegate: Continuation<T>
96 get() = this
97
Roman Elizarov0950dfa2018-07-13 10:33:25 +030098 override fun resumeWith(result: Result<T>) {
Roman Elizarovf29203c2018-01-11 12:39:36 +030099 val context = continuation.context
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300100 val state = result.toState()
Roman Elizarovf29203c2018-01-11 12:39:36 +0300101 if (dispatcher.isDispatchNeeded(context)) {
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300102 _state = state
Roman Elizarovf29203c2018-01-11 12:39:36 +0300103 resumeMode = MODE_ATOMIC_DEFAULT
104 dispatcher.dispatch(context, this)
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300105 } else {
Roman Elizarov51738242018-12-21 16:41:39 +0300106 executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
Vsevolod Tolstopyatovcd37d8e2018-10-18 16:59:46 +0300107 withCoroutineContext(this.context, countOrElement) {
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300108 continuation.resumeWith(result)
109 }
110 }
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300111 }
Roman Elizarovf29203c2018-01-11 12:39:36 +0300112 }
113
114 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
115 inline fun resumeCancellable(value: T) {
Roman Elizarovf29203c2018-01-11 12:39:36 +0300116 if (dispatcher.isDispatchNeeded(context)) {
117 _state = value
118 resumeMode = MODE_CANCELLABLE
119 dispatcher.dispatch(context, this)
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300120 } else {
Roman Elizarov51738242018-12-21 16:41:39 +0300121 executeUnconfined(value, MODE_CANCELLABLE) {
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300122 if (!resumeCancelled()) {
123 resumeUndispatched(value)
124 }
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300125 }
126 }
Roman Elizarovf29203c2018-01-11 12:39:36 +0300127 }
128
129 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
130 inline fun resumeCancellableWithException(exception: Throwable) {
131 val context = continuation.context
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300132 val state = CompletedExceptionally(exception)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300133 if (dispatcher.isDispatchNeeded(context)) {
134 _state = CompletedExceptionally(exception)
135 resumeMode = MODE_CANCELLABLE
136 dispatcher.dispatch(context, this)
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300137 } else {
Roman Elizarov51738242018-12-21 16:41:39 +0300138 executeUnconfined(state, MODE_CANCELLABLE) {
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300139 if (!resumeCancelled()) {
140 resumeUndispatchedWithException(exception)
141 }
Vsevolod Tolstopyatov838b0522018-09-28 16:17:26 +0300142 }
143 }
144 }
145
146 @Suppress("NOTHING_TO_INLINE")
147 inline fun resumeCancelled(): Boolean {
148 val job = context[Job]
149 if (job != null && !job.isActive) {
150 resumeWithException(job.getCancellationException())
151 return true
152 }
153
154 return false
Roman Elizarovf29203c2018-01-11 12:39:36 +0300155 }
156
157 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
158 inline fun resumeUndispatched(value: T) {
Vsevolod Tolstopyatovae857972018-10-16 15:47:26 +0300159 withCoroutineContext(context, countOrElement) {
Roman Elizarovf29203c2018-01-11 12:39:36 +0300160 continuation.resume(value)
161 }
162 }
163
164 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
165 inline fun resumeUndispatchedWithException(exception: Throwable) {
Vsevolod Tolstopyatovae857972018-10-16 15:47:26 +0300166 withCoroutineContext(context, countOrElement) {
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +0300167 continuation.resumeWithStackTrace(exception)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300168 }
169 }
170
171 // used by "yield" implementation
172 internal fun dispatchYield(value: T) {
173 val context = continuation.context
174 _state = value
175 resumeMode = MODE_CANCELLABLE
Vsevolod Tolstopyatov3ac73f62018-07-26 16:09:33 +0300176 dispatcher.dispatchYield(context, this)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300177 }
178
179 override fun toString(): String =
180 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
181}
182
183internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
184 is DispatchedContinuation -> resumeCancellable(value)
185 else -> resume(value)
186}
187
188internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
189 is DispatchedContinuation -> resumeCancellableWithException(exception)
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +0300190 else -> resumeWithStackTrace(exception)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300191}
192
193internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
194 is DispatchedContinuation -> continuation.resume(value)
195 else -> resume(value)
196}
197
198internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +0300199 is DispatchedContinuation -> continuation.resumeWithStackTrace(exception)
200 else -> resumeWithStackTrace(exception)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300201}
202
Roman Elizarovaa374a92018-11-02 00:10:39 +0300203internal abstract class DispatchedTask<in T>(
Roman Elizarov51738242018-12-21 16:41:39 +0300204 @JvmField public var resumeMode: Int
Roman Elizarovaa374a92018-11-02 00:10:39 +0300205) : SchedulerTask() {
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300206 internal abstract val delegate: Continuation<T>
Roman Elizarovf29203c2018-01-11 12:39:36 +0300207
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300208 internal abstract fun takeState(): Any?
209
210 internal open fun cancelResult(state: Any?, cause: Throwable) {}
Roman Elizarovf29203c2018-01-11 12:39:36 +0300211
212 @Suppress("UNCHECKED_CAST")
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300213 internal open fun <T> getSuccessfulResult(state: Any?): T =
Roman Elizarovf29203c2018-01-11 12:39:36 +0300214 state as T
215
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300216 internal fun getExceptionalResult(state: Any?): Throwable? =
Vsevolod Tolstopyatovc1092d52018-04-12 20:22:25 +0300217 (state as? CompletedExceptionally)?.cause
Roman Elizarovf29203c2018-01-11 12:39:36 +0300218
Roman Elizarovaa374a92018-11-02 00:10:39 +0300219 public final override fun run() {
Roman Elizarove64d9b72018-10-22 05:31:23 +0300220 val taskContext = this.taskContext
Vsevolod Tolstopyatovc022ab62019-05-14 15:10:09 +0300221 var fatalException: Throwable? = null
Roman Elizarovf29203c2018-01-11 12:39:36 +0300222 try {
223 val delegate = delegate as DispatchedContinuation<T>
224 val continuation = delegate.continuation
225 val context = continuation.context
Roman Elizarovf29203c2018-01-11 12:39:36 +0300226 val state = takeState() // NOTE: Must take state in any case, even if cancelled
Vsevolod Tolstopyatovae857972018-10-16 15:47:26 +0300227 withCoroutineContext(context, delegate.countOrElement) {
Vsevolod Tolstopyatovc022ab62019-05-14 15:10:09 +0300228 val exception = getExceptionalResult(state)
229 val job = if (resumeMode.isCancellableMode) context[Job] else null
230 /*
231 * Check whether continuation was originally resumed with an exception.
232 * If so, it dominates cancellation, otherwise the original exception
233 * will be silently lost.
234 */
235 if (exception == null && job != null && !job.isActive) {
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300236 val cause = job.getCancellationException()
237 cancelResult(state, cause)
Roman Elizarov9e9c9a32019-04-24 10:54:51 +0300238 continuation.resumeWithStackTrace(cause)
Roman Elizarov2b8218a2019-04-11 01:41:22 +0300239 } else {
Vsevolod Tolstopyatovc022ab62019-05-14 15:10:09 +0300240 if (exception != null) continuation.resumeWithStackTrace(exception)
241 else continuation.resume(getSuccessfulResult(state))
Roman Elizarovf29203c2018-01-11 12:39:36 +0300242 }
243 }
244 } catch (e: Throwable) {
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +0300245 // This instead of runCatching to have nicer stacktrace and debug experience
Vsevolod Tolstopyatovc022ab62019-05-14 15:10:09 +0300246 fatalException = e
Roman Elizarov16e20342018-10-21 20:08:49 +0300247 } finally {
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +0300248 val result = runCatching { taskContext.afterTask() }
Vsevolod Tolstopyatovc022ab62019-05-14 15:10:09 +0300249 handleFatalException(fatalException, result.exceptionOrNull())
Roman Elizarovf29203c2018-01-11 12:39:36 +0300250 }
251 }
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +0300252
253 /**
254 * Machinery that handles fatal exceptions in kotlinx.coroutines.
255 * There are two kinds of fatal exceptions:
256 *
257 * 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
258 * the library or the compiler has a bug that breaks internal invariants.
259 * They usually have specific workarounds, but require careful study of the cause and should
260 * be reported to the maintainers and fixed on the library's side anyway.
261 *
262 * 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
263 * While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
264 * we can't ignore it because it may leave coroutine in the inconsistent state.
265 * If you encounter such exception, you can either disable this context element or wrap it into
266 * another context element that catches all exceptions and handles it in the application specific manner.
267 *
268 * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
269 * a failed coroutine, but such exceptions should be reported anyway.
270 */
271 internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
272 if (exception === null && finallyException === null) return
273 if (exception !== null && finallyException !== null) {
274 exception.addSuppressedThrowable(finallyException)
275 }
276
277 val cause = exception ?: finallyException
Vsevolod Tolstopyatov132c1e32019-03-07 15:15:40 +0300278 val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +0300279 "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
Vsevolod Tolstopyatov4651b572019-03-13 14:29:19 +0300280 handleCoroutineException(this.delegate.context, reason)
Vsevolod Tolstopyatov0c8789d2019-03-07 15:07:41 +0300281 }
Roman Elizarovf29203c2018-01-11 12:39:36 +0300282}
283
Vsevolod Tolstopyatovbf455762018-10-23 11:38:14 +0300284internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
Roman Elizarov51738242018-12-21 16:41:39 +0300285 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
Robert Golusińskic33ef612018-10-23 11:29:56 +0200286 run()
287 }
Robert Golusińskic33ef612018-10-23 11:29:56 +0200288
Vsevolod Tolstopyatov1f7b2d82018-10-09 15:57:51 +0300289internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
Roman Elizarovf29203c2018-01-11 12:39:36 +0300290 val delegate = this.delegate
291 if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
292 // dispatch directly using this instance's Runnable implementation
293 val dispatcher = delegate.dispatcher
294 val context = delegate.context
Roman Elizarov9a0d8ac2018-01-11 14:32:19 +0300295 if (dispatcher.isDispatchNeeded(context)) {
Roman Elizarovf29203c2018-01-11 12:39:36 +0300296 dispatcher.dispatch(context, this)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300297 } else {
Roman Elizarov51738242018-12-21 16:41:39 +0300298 resumeUnconfined()
Roman Elizarovf29203c2018-01-11 12:39:36 +0300299 }
Vsevolod Tolstopyatovfaa47742018-10-18 11:54:56 +0300300 } else {
301 resume(delegate, mode)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300302 }
Vsevolod Tolstopyatovfaa47742018-10-18 11:54:56 +0300303}
Vsevolod Tolstopyatovfd54bc42018-10-17 18:37:36 +0300304
Vsevolod Tolstopyatovfaa47742018-10-18 11:54:56 +0300305internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
306 // slow-path - use delegate
307 val state = takeState()
308 val exception = getExceptionalResult(state)
309 if (exception != null) {
310 delegate.resumeWithExceptionMode(exception, useMode)
311 } else {
312 delegate.resumeMode(getSuccessfulResult(state), useMode)
Roman Elizarovf29203c2018-01-11 12:39:36 +0300313 }
314}
Vsevolod Tolstopyatov675c30c2018-11-02 15:55:20 +0300315
316
317@Suppress("NOTHING_TO_INLINE")
Vsevolod Tolstopyatov1032f582018-11-27 18:13:47 +0300318internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
319 resumeWith(Result.failure(recoverStackTrace(exception, this)))
320}