blob: 5bd1668db9b3fdd1810f43061d003c4a47006e1e [file] [log] [blame]
Roman Elizarovf29203c2018-01-11 12:39:36 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental
18
19import kotlinx.coroutines.experimental.internal.*
Roman Elizarovaa461cf2018-04-11 13:20:29 +030020import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarovf29203c2018-01-11 12:39:36 +030021import kotlin.coroutines.experimental.*
22
23@Suppress("PrivatePropertyName")
24private val UNDEFINED = Symbol("UNDEFINED")
25
26internal class DispatchedContinuation<in T>(
Roman Elizarovaa461cf2018-04-11 13:20:29 +030027 @JvmField val dispatcher: CoroutineDispatcher,
28 @JvmField val continuation: Continuation<T>
Roman Elizarovf29203c2018-01-11 12:39:36 +030029) : Continuation<T> by continuation, DispatchedTask<T> {
30 private var _state: Any? = UNDEFINED
31 public override var resumeMode: Int = 0
32
33 override fun takeState(): Any? {
34 val state = _state
35 check(state !== UNDEFINED) // fail-fast if repeatedly invoked
36 _state = UNDEFINED
37 return state
38 }
39
40 override val delegate: Continuation<T>
41 get() = this
42
43 override fun resume(value: T) {
44 val context = continuation.context
45 if (dispatcher.isDispatchNeeded(context)) {
46 _state = value
47 resumeMode = MODE_ATOMIC_DEFAULT
48 dispatcher.dispatch(context, this)
49 } else
50 resumeUndispatched(value)
51 }
52
53 override fun resumeWithException(exception: Throwable) {
54 val context = continuation.context
55 if (dispatcher.isDispatchNeeded(context)) {
56 _state = CompletedExceptionally(exception)
57 resumeMode = MODE_ATOMIC_DEFAULT
58 dispatcher.dispatch(context, this)
59 } else
60 resumeUndispatchedWithException(exception)
61 }
62
63 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
64 inline fun resumeCancellable(value: T) {
65 val context = continuation.context
66 if (dispatcher.isDispatchNeeded(context)) {
67 _state = value
68 resumeMode = MODE_CANCELLABLE
69 dispatcher.dispatch(context, this)
70 } else
71 resumeUndispatched(value)
72 }
73
74 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
75 inline fun resumeCancellableWithException(exception: Throwable) {
76 val context = continuation.context
77 if (dispatcher.isDispatchNeeded(context)) {
78 _state = CompletedExceptionally(exception)
79 resumeMode = MODE_CANCELLABLE
80 dispatcher.dispatch(context, this)
81 } else
82 resumeUndispatchedWithException(exception)
83 }
84
85 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
86 inline fun resumeUndispatched(value: T) {
87 withCoroutineContext(context) {
88 continuation.resume(value)
89 }
90 }
91
92 @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
93 inline fun resumeUndispatchedWithException(exception: Throwable) {
94 withCoroutineContext(context) {
95 continuation.resumeWithException(exception)
96 }
97 }
98
99 // used by "yield" implementation
100 internal fun dispatchYield(value: T) {
101 val context = continuation.context
102 _state = value
103 resumeMode = MODE_CANCELLABLE
104 dispatcher.dispatch(context, this)
105 }
106
107 override fun toString(): String =
108 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
109}
110
111internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
112 is DispatchedContinuation -> resumeCancellable(value)
113 else -> resume(value)
114}
115
116internal fun <T> Continuation<T>.resumeCancellableWithException(exception: Throwable) = when (this) {
117 is DispatchedContinuation -> resumeCancellableWithException(exception)
118 else -> resumeWithException(exception)
119}
120
121internal fun <T> Continuation<T>.resumeDirect(value: T) = when (this) {
122 is DispatchedContinuation -> continuation.resume(value)
123 else -> resume(value)
124}
125
126internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable) = when (this) {
127 is DispatchedContinuation -> continuation.resumeWithException(exception)
128 else -> resumeWithException(exception)
129}
130
131/**
132 * @suppress **This is unstable API and it is subject to change.**
133 */
134public interface DispatchedTask<in T> : Runnable {
135 public val delegate: Continuation<T>
136 public val resumeMode: Int get() = MODE_CANCELLABLE
137
138 public fun takeState(): Any?
139
140 @Suppress("UNCHECKED_CAST")
141 public fun <T> getSuccessfulResult(state: Any?): T =
142 state as T
143
144 public fun getExceptionalResult(state: Any?): Throwable? =
Vsevolod Tolstopyatovc1092d52018-04-12 20:22:25 +0300145 (state as? CompletedExceptionally)?.cause
Roman Elizarovf29203c2018-01-11 12:39:36 +0300146
147 public override fun run() {
148 try {
149 val delegate = delegate as DispatchedContinuation<T>
150 val continuation = delegate.continuation
151 val context = continuation.context
152 val job = if (resumeMode.isCancellableMode) context[Job] else null
153 val state = takeState() // NOTE: Must take state in any case, even if cancelled
154 withCoroutineContext(context) {
155 if (job != null && !job.isActive)
156 continuation.resumeWithException(job.getCancellationException())
157 else {
158 val exception = getExceptionalResult(state)
159 if (exception != null)
160 continuation.resumeWithException(exception)
161 else
162 continuation.resume(getSuccessfulResult(state))
163 }
164 }
165 } catch (e: Throwable) {
166 throw DispatchException("Unexpected exception running $this", e)
167 }
168 }
169}
170
171/**
172 * @suppress **This is unstable API and it is subject to change.**
173 */
174public fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
175 var useMode = mode
176 val delegate = this.delegate
177 if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
178 // dispatch directly using this instance's Runnable implementation
179 val dispatcher = delegate.dispatcher
180 val context = delegate.context
Roman Elizarov9a0d8ac2018-01-11 14:32:19 +0300181 if (dispatcher.isDispatchNeeded(context)) {
Roman Elizarovf29203c2018-01-11 12:39:36 +0300182 dispatcher.dispatch(context, this)
183 return // and that's it -- dispatched via fast-path
184 } else {
185 useMode = MODE_UNDISPATCHED
186 }
187 }
188 // slow-path - use delegate
189 val state = takeState()
190 val exception = getExceptionalResult(state)
191 if (exception != null) {
192 delegate.resumeWithExceptionMode(exception, useMode)
193 } else {
194 delegate.resumeMode(getSuccessfulResult(state), useMode)
195 }
196}