ArrayChannel implementation and tests
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 4daf5f1..1b1adad 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -22,13 +22,16 @@
val isCancelled: Boolean
/**
- * Tries to resume this continuation with a given value and returns `true` if it was successful,
- * or `false` otherwise (it was already resumed or cancelled).
- *
- * An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
- * is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
+ * Tries to resume this continuation with a given value and returns non-null object token if it was successful,
+ * or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
+ * [completeResume] must be invoked with it.
*/
- fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
+ fun tryResume(value: T): Any?
+
+ /**
+ * Completes the execution of [tryResume] on its non-null result.
+ */
+ fun completeResume(token: Any)
/**
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@
override val isCancelled: Boolean
get() = getState() is Cancelled
- override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
+ override fun tryResume(value: T): Any? {
while (true) { // lock-free loop on state
val state = getState() // atomic read
when (state) {
- is Active -> if (updateState(state, value, onSuccess)) return true
- else -> return false // cannot resume -- not active anymore
+ is Active -> if (tryUpdateState(state, value)) return state
+ else -> return null // cannot resume -- not active anymore
}
}
}
+ override fun completeResume(token: Any) {
+ completeUpdateState(token, getState())
+ }
+
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?) {
val decision = this.decision // volatile read