blob: 3c01430d5cee4b78b2ba0d5e36a40eae29e00899 [file] [log] [blame]
Vsevolod Tolstopyatove1fa1972018-06-19 15:54:58 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.selects
18
19import kotlinx.atomicfu.*
20import kotlinx.coroutines.experimental.*
21import kotlinx.coroutines.experimental.channels.*
22import kotlinx.coroutines.experimental.internal.*
23import kotlinx.coroutines.experimental.internalAnnotations.*
24import kotlinx.coroutines.experimental.intrinsics.*
25import kotlinx.coroutines.experimental.timeunit.*
26import kotlin.coroutines.experimental.*
27import kotlin.coroutines.experimental.intrinsics.*
28
29/**
30 * Scope for [select] invocation.
31 */
32public interface SelectBuilder<in R> {
33 /**
34 * Registers clause in this [select] expression without additional parameters that does not select any value.
35 */
36 public operator fun SelectClause0.invoke(block: suspend () -> R)
37
38 /**
39 * Registers clause in this [select] expression without additional parameters that selects value of type [Q].
40 */
41 public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
42
43 /**
44 * Registers clause in this [select] expression with additional parameter of type [P] that selects value of type [Q].
45 */
46 public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
47
48 /**
49 * Registers clause in this [select] expression with additional parameter nullable parameter of type [P]
50 * with the `null` value for this parameter that selects value of type [Q].
51 */
52 public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
53
54 /**
55 * Clause that selects the given [block] after a specified timeout passes.
56 * If timeout is negative or zero, [block] is selected immediately.
57 *
58 * @param time timeout time
59 * @param unit timeout unit (milliseconds by default)
60 */
61 public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
62}
63
64/**
65 * Clause for [select] expression without additional parameters that does not select any value.
66 */
67public interface SelectClause0 {
68 /**
69 * Registers this clause with the specified [select] instance and [block] of code.
70 * @suppress **This is unstable API and it is subject to change.**
71 */
72 public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
73}
74
75/**
76 * Clause for [select] expression without additional parameters that selects value of type [Q].
77 */
78public interface SelectClause1<out Q> {
79 /**
80 * Registers this clause with the specified [select] instance and [block] of code.
81 * @suppress **This is unstable API and it is subject to change.**
82 */
83 public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
84}
85
86/**
87 * Clause for [select] expression with additional parameter of type [P] that selects value of type [Q].
88 */
89public interface SelectClause2<in P, out Q> {
90 /**
91 * Registers this clause with the specified [select] instance and [block] of code.
92 * @suppress **This is unstable API and it is subject to change.**
93 */
94 public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
95}
96
97/**
98 * Internal representation of select instance. This instance is called _selected_ when
99 * the clause to execute is already picked.
100 *
101 * @suppress **This is unstable API and it is subject to change.**
102 */
103public interface SelectInstance<in R> {
104 /**
105 * Returns `true` when this [select] statement had already picked a clause to execute.
106 */
107 public val isSelected: Boolean
108
109 /**
110 * Tries to select this instance.
111 */
112 public fun trySelect(idempotent: Any?): Boolean
113
114 /**
115 * Performs action atomically with [trySelect].
116 */
117 public fun performAtomicTrySelect(desc: AtomicDesc): Any?
118
119 /**
120 * Performs action atomically when [isSelected] is `false`.
121 */
122 public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
123
124 /**
125 * Returns completion continuation of this select instance.
126 * This select instance must be _selected_ first.
127 * All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
128 */
129 public val completion: Continuation<R>
130
131 /**
132 * Resumes this instance in a cancellable way ([MODE_CANCELLABLE]).
133 */
134 public fun resumeSelectCancellableWithException(exception: Throwable)
135
136 /**
137 * Disposes the specified handle when this instance is selected.
138 */
139 public fun disposeOnSelect(handle: DisposableHandle)
140}
141
142/**
143 * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
144 * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
145 * is either _selected_ or _fails_.
146 *
147 * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
148 * becomes the result of the select. If any clause _fails_, then the select invocation produces the
149 * corresponding exception. No clause is selected in this case.
150 *
151 * This select function is _biased_ to the first clause. When multiple clauses can be selected at the same time,
152 * the first one of them gets priority. Use [selectUnbiased] for an unbiased (randomized) selection among
153 * the clauses.
154
155 * There is no `default` clause for select expression. Instead, each selectable suspending function has the
156 * corresponding non-suspending version that can be used with a regular `when` expression to select one
157 * of the alternatives or to perform default (`else`) action if none of them can be immediately selected.
158 *
159 * | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
160 * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
161 * | [Job] | [join][Job.join] | [onJoin][SelectBuilder.onJoin] | [isCompleted][Job.isCompleted]
162 * | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Job.isCompleted]
163 * | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
164 * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
165 * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
166 * | [Mutex] | [lock][Mutex.lock] | [onLock][SelectBuilder.onLock] | [tryLock][Mutex.tryLock]
167 * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
168 *
169 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
170 * function is suspended, this function immediately resumes with [CancellationException].
171 *
172 * Atomicity of cancellation depends on the clause: [onSend][SelectBuilder.onSend], [onReceive][SelectBuilder.onReceive],
173 * [onReceiveOrNull][SelectBuilder.onReceiveOrNull], and [onLock][SelectBuilder.onLock] clauses are
174 * *atomically cancellable*. When select throws [CancellationException] it means that those clauses had not performed
175 * their respective operations.
176 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
177 * continue to execute even after it was cancelled from the same thread in the case when this select operation
178 * was already resumed on atomically cancellable clause and the continuation was posted for execution to the thread's queue.
179 *
180 * Note, that this function does not check for cancellation when it is not suspended.
181 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
182 */
183public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
184 // TODO suspendCoroutineUnintercepted
185 suspendCoroutineOrReturn { cont ->
186 val scope = SelectBuilderImpl(cont)
187 try {
188 builder(scope)
189 } catch (e: Throwable) {
190 scope.handleBuilderException(e)
191 }
192 scope.getResult()
193 }
194
195
196internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
197private val UNDECIDED: Any = Symbol("UNDECIDED")
198private val RESUMED: Any = Symbol("RESUMED")
199
200@PublishedApi
201internal class SelectBuilderImpl<in R>(
202 private val delegate: Continuation<R>
203) : LockFreeLinkedListHead(), SelectBuilder<R>,
204 SelectInstance<R>, Continuation<R> {
205 // selection state is "this" (list of nodes) initially and is replaced by idempotent marker (or null) when selected
206 private val _state = atomic<Any?>(this)
207
208 // this is basically our own SafeContinuation
209 private val _result = atomic<Any?>(UNDECIDED)
210
211 // cancellability support
212 @Volatile
213 private var parentHandle: DisposableHandle? = null
214
215 /* Result state machine
216
217 +-----------+ getResult +---------------------+ resume +---------+
218 | UNDECIDED | ------------> | COROUTINE_SUSPENDED | ---------> | RESUMED |
219 +-----------+ +---------------------+ +---------+
220 |
221 | resume
222 V
223 +------------+ getResult
224 | value/Fail | -----------+
225 +------------+ |
226 ^ |
227 | |
228 +-------------------+
229 */
230
231 override val context: CoroutineContext get() = delegate.context
232
233 override val completion: Continuation<R> get() = this
234
235 private inline fun doResume(value: () -> Any?, block: () -> Unit) {
236 check(isSelected) { "Must be selected first" }
237 _result.loop { result ->
238 when {
239 result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return
240 result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED,
241 RESUMED
242 )) {
243 block()
244 return
245 }
246 else -> throw IllegalStateException("Already resumed")
247 }
248 }
249 }
250
251 // Resumes in MODE_DIRECT
252 override fun resume(value: R) {
253 doResume({ value }) {
254 delegate.resumeDirect(value)
255 }
256 }
257
258 // Resumes in MODE_DIRECT
259 override fun resumeWithException(exception: Throwable) {
260 doResume({ Fail(exception) }) {
261 delegate.resumeDirectWithException(exception)
262 }
263 }
264
265 // Resumes in MODE_CANCELLABLE
266 override fun resumeSelectCancellableWithException(exception: Throwable) {
267 doResume({ Fail(exception) }) {
268 delegate.resumeCancellableWithException(exception)
269 }
270 }
271
272 @PublishedApi
273 internal fun getResult(): Any? {
274 if (!isSelected) initCancellability()
275 var result = _result.value // atomic read
276 if (result === UNDECIDED) {
277 if (_result.compareAndSet(UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
278 result = _result.value // reread volatile var
279 }
280 when {
281 result === RESUMED -> throw IllegalStateException("Already resumed")
282 result is Fail -> throw result.exception
283 else -> return result // either COROUTINE_SUSPENDED or data
284 }
285 }
286
287 private fun initCancellability() {
288 val parent = context[Job] ?: return
289 val newRegistration = parent.invokeOnCompletion(
290 onCancelling = true, handler = SelectOnCancellation(parent).asHandler)
291 parentHandle = newRegistration
292 // now check our state _after_ registering
293 if (isSelected) newRegistration.dispose()
294 }
295
296 private inner class SelectOnCancellation(job: Job) : JobCancellationNode<Job>(job) {
297 // Note: may be invoked multiple times, but only the first trySelect succeeds anyway
298 override fun invoke(cause: Throwable?) {
299 if (trySelect(null))
300 resumeSelectCancellableWithException(job.getCancellationException())
301 }
302 override fun toString(): String = "SelectOnCancellation[${this@SelectBuilderImpl}]"
303 }
304
305 private val state: Any? get() {
306 _state.loop { state ->
307 if (state !is OpDescriptor) return state
308 state.perform(this)
309 }
310 }
311
312 @PublishedApi
313 internal fun handleBuilderException(e: Throwable) {
314 if (trySelect(null))
315 resumeWithException(e)
316 else
317 handleCoroutineException(context, e)
318 }
319
320 override val isSelected: Boolean get() = state !== this
321
322 override fun disposeOnSelect(handle: DisposableHandle) {
323 val node = DisposeNode(handle)
324 while (true) { // lock-free loop on state
325 val state = this.state
326 if (state === this) {
327 if (addLastIf(node, { this.state === this }))
328 return
329 } else { // already selected
330 handle.dispose()
331 return
332 }
333 }
334 }
335
336 private fun doAfterSelect() {
337 parentHandle?.dispose()
338 forEach<DisposeNode> {
339 it.handle.dispose()
340 }
341 }
342
343 // it is just like start(), but support idempotent start
344 override fun trySelect(idempotent: Any?): Boolean {
345 check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
346 while (true) { // lock-free loop on state
347 val state = this.state
348 when {
349 state === this -> {
350 if (_state.compareAndSet(this, idempotent)) {
351 doAfterSelect()
352 return true
353 }
354 }
355 // otherwise -- already selected
356 idempotent == null -> return false // already selected
357 state === idempotent -> return true // was selected with this marker
358 else -> return false
359 }
360 }
361 }
362
363 override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
364 override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
365
366 private inner class AtomicSelectOp(
367 @JvmField val desc: AtomicDesc,
368 @JvmField val select: Boolean
369 ) : AtomicOp<Any?>() {
370 override fun prepare(affected: Any?): Any? {
371 // only originator of operation makes preparation move of installing descriptor into this selector's state
372 // helpers should never do it, or risk ruining progress when they come late
373 if (affected == null) {
374 // we are originator (affected reference is not null if helping)
375 prepareIfNotSelected()?.let { return it }
376 }
377 return desc.prepare(this)
378 }
379
380 override fun complete(affected: Any?, failure: Any?) {
381 completeSelect(failure)
382 desc.complete(this, failure)
383 }
384
385 fun prepareIfNotSelected(): Any? {
386 _state.loop { state ->
387 when {
388 state === this@AtomicSelectOp -> return null // already in progress
389 state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help
390 state === this@SelectBuilderImpl -> {
391 if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp))
392 return null // success
393 }
394 else -> return ALREADY_SELECTED
395 }
396 }
397 }
398
399 private fun completeSelect(failure: Any?) {
400 val selectSuccess = select && failure == null
401 val update = if (selectSuccess) null else this@SelectBuilderImpl
402 if (_state.compareAndSet(this@AtomicSelectOp, update)) {
403 if (selectSuccess)
404 doAfterSelect()
405 }
406 }
407 }
408
409 override fun SelectClause0.invoke(block: suspend () -> R) {
410 registerSelectClause0(this@SelectBuilderImpl, block)
411 }
412
413 override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
414 registerSelectClause1(this@SelectBuilderImpl, block)
415 }
416
417 override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) {
418 registerSelectClause2(this@SelectBuilderImpl, param, block)
419 }
420
421 override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
422 if (time <= 0L) {
423 if (trySelect(null))
424 block.startCoroutineUndispatched(completion)
425 return
426 }
427 val action = Runnable {
428 // todo: we could have replaced startCoroutine with startCoroutineUndispatched
429 // But we need a way to know that Delay.invokeOnTimeout had used the right thread
430 if (trySelect(null))
431 block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
432 }
433 disposeOnSelect(context.delay.invokeOnTimeout(time, unit, action))
434 }
435
436 private class DisposeNode(
437 @JvmField val handle: DisposableHandle
438 ) : LockFreeLinkedListNode()
439
440 private class Fail(
441 @JvmField val exception: Throwable
442 )
443}