Basic Channel interfaces and RendezvousChannel implementation
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 64d815b..4daf5f1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -20,15 +20,37 @@
* Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
*/
val isCancelled: Boolean
+
+ /**
+ * Tries to resume this continuation with a given value and returns `true` if it was successful,
+ * or `false` otherwise (it was already resumed or cancelled).
+ *
+ * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
+ * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+ */
+ fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+
+ /**
+ * Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
+ * [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
+ */
+ fun initCancellability()
}
/**
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
+ *
+ * If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
+ * cancellable until [CancellableContinuation.initCancellability] is invoked.
*/
-public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
+public inline suspend fun <T> suspendCancellableCoroutine(
+ holdCancellability: Boolean = false,
+ crossinline block: (CancellableContinuation<T>) -> Unit
+): T =
suspendCoroutineOrReturn { cont ->
val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
+ if (!holdCancellability) safe.initCancellability()
block(safe)
safe.getResult()
}
@@ -63,7 +85,9 @@
const val YIELD = 3 // used by cancellable "yield"
}
- init { initParentJob(parentJob) }
+ override fun initCancellability() {
+ initParentJob(parentJob)
+ }
fun getResult(): Any? {
val decision = this.decision // volatile read
@@ -80,6 +104,16 @@
override val isCancelled: Boolean
get() = getState() is Cancelled
+ override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+ while (true) { // lock-free loop on state
+ val state = getState() // atomic read
+ when (state) {
+ is Active -> if (updateState(state, value, onSuccess)) return true
+ else -> return false // cannot resume -- not active anymore
+ }
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?) {
val decision = this.decision // volatile read