Select statement with onSend/onReceive/onAwait clauses
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index e011e2b..4db38bb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -93,10 +93,10 @@
// --------------- implementation ---------------
private open class StandaloneCoroutine(
- val parentContext: CoroutineContext,
+ override val parentContext: CoroutineContext,
active: Boolean
-) : AbstractCoroutine<Unit>(parentContext, active) {
- override fun afterCompletion(state: Any?) {
+) : AbstractCoroutine<Unit>(active) {
+ override fun afterCompletion(state: Any?, mode: Int) {
// note the use of the parent's job context below!
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
}
@@ -119,13 +119,13 @@
}
private class BlockingCoroutine<T>(
- context: CoroutineContext,
+ override val parentContext: CoroutineContext,
val blockedThread: Thread,
val hasPrivateEventLoop: Boolean
-) : AbstractCoroutine<T>(context, active = true) {
- val eventLoop: EventLoop? = context[ContinuationInterceptor] as? EventLoop
+) : AbstractCoroutine<T>(active = true) {
+ val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
- override fun afterCompletion(state: Any?) {
+ override fun afterCompletion(state: Any?, mode: Int) {
if (Thread.currentThread() != blockedThread)
LockSupport.unpark(blockedThread)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 3877fc8..3663878 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -19,7 +19,7 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.ContinuationInterceptor
+import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
import kotlin.coroutines.experimental.suspendCoroutine
@@ -57,18 +57,27 @@
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
* [completeResume] must be invoked with it.
+ *
+ * When [idempotent] is not `null`, this function performs _idempotent_ operation, so that
+ * further invocations with the same non-null reference produce the same result.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
*/
- public fun tryResume(value: T): Any?
+ public fun tryResume(value: T, idempotent: Any? = null): Any?
/**
* Tries to resume this continuation with a given exception and returns non-null object token if it was successful,
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
* [completeResume] must be invoked with it.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
*/
public fun tryResumeWithException(exception: Throwable): Any?
/**
* Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
*/
public fun completeResume(token: Any)
@@ -113,7 +122,7 @@
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineOrReturn { cont ->
- val cancellable = CancellableContinuationImpl(cont, getParentJobOrAbort(cont), active = true)
+ val cancellable = CancellableContinuationImpl(cont, active = true)
if (!holdCancellability) cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
@@ -141,57 +150,66 @@
}
@PublishedApi
-internal fun getParentJobOrAbort(cont: Continuation<*>): Job? {
- val job = cont.context[Job]
- // fast path when parent job is already complete (we don't even construct CancellableContinuationImpl object)
- if (job != null && !job.isActive) throw job.getCompletionException()
- return job
-}
-
-@PublishedApi
internal open class CancellableContinuationImpl<in T>(
- private val delegate: Continuation<T>,
- private val parentJob: Job?,
+ @JvmField
+ protected val delegate: Continuation<T>,
active: Boolean
-) : AbstractCoroutine<T>(delegate.context, active), CancellableContinuation<T> {
+) : AbstractCoroutine<T>(active), CancellableContinuation<T> {
@Volatile
private var decision = UNDECIDED
- private companion object {
+ override val parentContext: CoroutineContext
+ get() = delegate.context
+
+ protected companion object {
+ @JvmStatic
val DECISION: AtomicIntegerFieldUpdater<CancellableContinuationImpl<*>> =
AtomicIntegerFieldUpdater.newUpdater(CancellableContinuationImpl::class.java, "decision")
const val UNDECIDED = 0
const val SUSPENDED = 1
const val RESUMED = 2
- const val YIELD = 3 // used by cancellable "yield"
- const val UNDISPATCHED = 4 // used by "undispatchedXXX"
+
+ const val MODE_UNDISPATCHED = 1
+ const val MODE_DIRECT = 2
+
+ @Suppress("UNCHECKED_CAST")
+ fun <T> getSuccessfulResult(state: Any?): T = if (state is CompletedIdempotentResult) state.result as T else state as T
}
override fun initCancellability() {
- initParentJob(parentJob)
+ initParentJob(delegate.context[Job])
}
@PublishedApi
internal fun getResult(): Any? {
val decision = this.decision // volatile read
- when (decision) {
- UNDECIDED -> if (DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
- YIELD -> return COROUTINE_SUSPENDED
- }
+ if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
// otherwise, afterCompletion was already invoked, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw state.exception
- return state
+ return getSuccessfulResult(state)
}
override val isCancelled: Boolean get() = state is Cancelled
- override fun tryResume(value: T): Any? {
+ override fun tryResume(value: T, idempotent: Any?): Any? {
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
- is Incomplete -> if (tryUpdateState(state, value)) return state
+ is Incomplete -> {
+ val idempotentStart = state.idempotentStart
+ val update: Any? = if (idempotent == null && idempotentStart == null) value else
+ CompletedIdempotentResult(idempotentStart, idempotent, value, state)
+ if (tryUpdateState(state, update)) return state
+ }
+ is CompletedIdempotentResult -> {
+ if (state.idempotentResume === idempotent) {
+ check(state.result === value) { "Non-idempotent resume" }
+ return state.token
+ } else
+ return null
+ }
else -> return null // cannot resume -- not active anymore
}
}
@@ -201,56 +219,69 @@
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
- is Incomplete -> if (tryUpdateState(state, CompletedExceptionally(exception))) return state
+ is Incomplete -> {
+ if (tryUpdateState(state, CompletedExceptionally(state.idempotentStart, exception))) return state
+ }
else -> return null // cannot resume -- not active anymore
}
}
}
override fun completeResume(token: Any) {
- completeUpdateState(token, state)
+ completeUpdateState(token, state, mode = 0)
}
- @Suppress("UNCHECKED_CAST")
- override fun afterCompletion(state: Any?) {
+ override fun afterCompletion(state: Any?, mode: Int) {
val decision = this.decision // volatile read
if (decision == UNDECIDED && DECISION.compareAndSet(this, UNDECIDED, RESUMED)) return // will get result in getResult
// otherwise, getResult has already commenced, i.e. it was resumed later or in other thread
- when {
- decision == UNDISPATCHED -> undispatchedCompletion(state)
- state is CompletedExceptionally -> delegate.resumeWithException(state.exception)
- decision == YIELD && delegate is DispatchedContinuation -> delegate.resumeYield(parentJob, state as T)
- else -> delegate.resume(state as T)
+ if (state is CompletedExceptionally) {
+ val exception = state.exception
+ when (mode) {
+ 0 -> delegate.resumeWithException(exception)
+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWithException(exception)
+ MODE_DIRECT -> {
+ if (delegate is DispatchedContinuation)
+ delegate.continuation.resumeWithException(exception)
+ else
+ delegate.resumeWithException(exception)
+ }
+ else -> error("Invalid mode $mode")
+ }
+ } else {
+ val value = getSuccessfulResult<T>(state)
+ when (mode) {
+ 0 -> delegate.resume(value)
+ MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatched(value)
+ MODE_DIRECT -> {
+ if (delegate is DispatchedContinuation)
+ delegate.continuation.resume(value)
+ else
+ delegate.resume(value)
+ }
+ else -> error("Invalid mode $mode")
+ }
}
}
- @Suppress("UNCHECKED_CAST")
- private fun undispatchedCompletion(state: Any?) {
- delegate as DispatchedContinuation // type assertion -- was checked in resumeUndispatched
- if (state is CompletedExceptionally)
- delegate.resumeUndispatchedWithException(state.exception)
- else
- delegate.resumeUndispatched(state as T)
- }
-
- // can only be invoked in the same thread as getResult (see "yield"), afterCompletion may be concurrent
- fun resumeYield(value: T) {
- if ((context[ContinuationInterceptor] as? CoroutineDispatcher)?.isDispatchNeeded(context) == true)
- DECISION.compareAndSet(this, UNDECIDED, YIELD) // try mark as needing dispatch
- resume(value)
- }
-
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
- DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
- resume(value)
+ resume(value, MODE_UNDISPATCHED)
}
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
- DECISION.compareAndSet(this@CancellableContinuationImpl, SUSPENDED, UNDISPATCHED)
- resumeWithException(exception)
+ resumeWithException(exception, MODE_UNDISPATCHED)
+ }
+
+ private class CompletedIdempotentResult(
+ idempotentStart: Any?,
+ @JvmField val idempotentResume: Any?,
+ @JvmField val result: Any?,
+ @JvmField val token: Incomplete
+ ) : CompletedIdempotentStart(idempotentStart) {
+ override fun toString(): String = "CompletedIdempotentResult[$result]"
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index b581336..ac7e1c4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -94,8 +94,8 @@
}
internal class DispatchedContinuation<in T>(
- val dispatcher: CoroutineDispatcher,
- val continuation: Continuation<T>
+ @JvmField val dispatcher: CoroutineDispatcher,
+ @JvmField val continuation: Continuation<T>
): Continuation<T> by continuation {
override fun resume(value: T) {
val context = continuation.context
@@ -132,20 +132,15 @@
}
// used by "yield" implementation
- fun resumeYield(job: Job?, value: T) {
+ internal fun dispatchYield(job: Job?, value: T) {
val context = continuation.context
- if (dispatcher.isDispatchNeeded(context))
- dispatcher.dispatch(context, Runnable {
- withCoroutineContext(context) {
- if (job?.isCompleted == true)
- continuation.resumeWithException(job.getCompletionException())
- else
- continuation.resume(value)
- }
- })
- else
+ dispatcher.dispatch(context, Runnable {
withCoroutineContext(context) {
- continuation.resume(value)
+ if (job != null && job.isCompleted)
+ continuation.resumeWithException(job.getCompletionException())
+ else
+ continuation.resume(value)
}
+ })
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index 0ef0ffa..4630cc5 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -49,33 +49,48 @@
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
* It stores the result of continuation in the state of the job.
*
- * @param context the new context for the coroutine. Use [newCoroutineContext] to create it.
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
public abstract class AbstractCoroutine<in T>(
- context: CoroutineContext,
active: Boolean
) : JobSupport(active), Continuation<T>, CoroutineScope {
- @Suppress("LeakingThis")
- override val context: CoroutineContext = context + this // merges this job into this context
+ // context must be Ok for unsafe publishing (it is persistent),
+ // so we don't mark this _context variable as volatile, but leave potential benign race here
+ private var _context: CoroutineContext? = null // created on first need
- final override fun resume(value: T) {
+ @Suppress("LeakingThis")
+ public final override val context: CoroutineContext
+ get() = _context ?: createContext().also { _context = it }
+
+ protected abstract val parentContext: CoroutineContext
+
+ protected open fun createContext() = parentContext + this
+
+ protected open fun defaultResumeMode(): Int = 0
+
+ final override fun resume(value: T) = resume(value, defaultResumeMode())
+
+ protected fun resume(value: T, mode: Int) {
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
- is Incomplete -> if (updateState(state, value)) return
+ is Incomplete -> if (updateState(state, value, mode)) return
is Cancelled -> return // ignore resumes on cancelled continuation
else -> throw IllegalStateException("Already resumed, but got value $value")
}
}
}
- final override fun resumeWithException(exception: Throwable) {
+ final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode())
+
+ protected fun resumeWithException(exception: Throwable, mode: Int) {
while (true) { // lock-free loop on state
val state = this.state // atomic read
when (state) {
- is Incomplete -> if (updateState(state, CompletedExceptionally(exception))) return
+ is Incomplete -> {
+ if (updateState(state, CompletedExceptionally(state.idempotentStart, exception), mode)) return
+ }
is Cancelled -> {
// ignore resumes on cancelled continuation, but handle exception if a different one is here
if (exception != state.exception) handleCoroutineException(context, exception)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 38356b6..9c75420 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -16,6 +16,9 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.startCoroutine
@@ -67,6 +70,12 @@
public suspend fun await(): T
/**
+ * Registers [onAwait][SelectBuilder.onAwait] select clause.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
+
+ /**
* Returns *completed* result or throws [IllegalStateException] if this deferred value has not
* [completed][isCompleted] yet. It throws the corresponding exception if this deferred has
* [completed exceptionally][isCompletedExceptionally].
@@ -118,9 +127,9 @@
async(context, block = block)
private open class DeferredCoroutine<T>(
- context: CoroutineContext,
+ override val parentContext: CoroutineContext,
active: Boolean
-) : AbstractCoroutine<T>(context, active), Deferred<T> {
+) : AbstractCoroutine<T>(active), Deferred<T> {
override val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
override val isCancelled: Boolean get() = state is Cancelled
@@ -152,6 +161,25 @@
})
}
+ override fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R) {
+ if (select.isSelected) return
+ val state = this.state
+ if (state is Incomplete) {
+ select.unregisterOnCompletion(invokeOnCompletion(SelectOnCompletion(this, select, block)))
+ } else
+ selectCompletion(select, block, state)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ internal fun <R> selectCompletion(select: SelectInstance<R>, block: suspend (T) -> R, state: Any? = this.state) {
+ if (select.trySelect(idempotent = null)) {
+ if (state is CompletedExceptionally)
+ select.resumeSelectWithException(state.exception)
+ else
+ block.startUndispatchedCoroutine(state as T, select.completion)
+ }
+ }
+
@Suppress("UNCHECKED_CAST")
override fun getCompleted(): T {
val state = this.state
@@ -161,10 +189,19 @@
}
}
+private class SelectOnCompletion<T, R>(
+ deferred: DeferredCoroutine<T>,
+ private val select: SelectInstance<R>,
+ private val block: suspend (T) -> R
+) : JobNode<DeferredCoroutine<T>>(deferred) {
+ override fun invoke(reason: Throwable?) = job.selectCompletion(select, block)
+ override fun toString(): String = "SelectOnCompletion[$select]"
+}
+
private class LazyDeferredCoroutine<T>(
- context: CoroutineContext,
- val block: suspend CoroutineScope.() -> T
-) : DeferredCoroutine<T>(context, active = false) {
+ parentContext: CoroutineContext,
+ private val block: suspend CoroutineScope.() -> T
+) : DeferredCoroutine<T>(parentContext, active = false) {
override fun onStart() {
block.startCoroutine(this, this)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index abd34bb..8c93e97 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -290,12 +290,16 @@
return
}
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
- val newRegistration = parent.invokeOnCompletion(CancelOnCompletion(parent, this))
+ val newRegistration = parent.invokeOnCompletion(ParentOnCompletion(parent, this))
registration = newRegistration
// now check our state _after_ registering (see updateState order of actions)
if (isCompleted) newRegistration.unregister()
}
+ internal open fun onParentCompletion(cause: Throwable?) {
+ cancel()
+ }
+
/**
* Returns current state of this job.
*/
@@ -310,9 +314,9 @@
/**
* Tries to update current [state] of this job.
*/
- internal fun updateState(expect: Any, update: Any?): Boolean {
+ internal fun updateState(expect: Any, update: Any?, mode: Int): Boolean {
if (!tryUpdateState(expect, update)) return false
- completeUpdateState(expect, update)
+ completeUpdateState(expect, update, mode)
return true
}
@@ -324,7 +328,7 @@
return true // continues in completeUpdateState
}
- internal fun completeUpdateState(expect: Any, update: Any?) {
+ internal fun completeUpdateState(expect: Any, update: Any?, mode: Int) {
// Invoke completion handlers
val cause = (update as? CompletedExceptionally)?.cause
var completionException: Throwable? = null
@@ -350,7 +354,7 @@
// handle invokeOnCompletion exceptions
completionException?.let { handleCompletionException(it) }
// Do other (overridable) processing after completion handlers
- afterCompletion(update)
+ afterCompletion(update, mode)
}
public final override val isActive: Boolean get() {
@@ -378,63 +382,110 @@
// return: 0 -> false (not new), 1 -> true (started), -1 -> retry
internal fun startInternal(state: Any?): Int {
when {
- // EMPTY_NEW state -- no completion handlers, new
- state === EmptyNew -> {
+ state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
if (!STATE.compareAndSet(this, state, EmptyActive)) return -1
onStart()
return 1
}
- // LIST -- a list of completion handlers (either new or active)
- state is NodeList -> {
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
if (state.isActive) return 0
if (!NodeList.ACTIVE.compareAndSet(state, null, NodeList.ACTIVE_STATE)) return -1
onStart()
return 1
}
- // not a new state
- else -> return 0
+ else -> return 0 // not a new state
}
}
- internal fun describeStart(failureMarker: Any): AtomicDesc =
- object : AtomicDesc() {
- override fun prepare(op: AtomicOp): Any? {
- while (true) { // lock-free loop on state
- val state = this@JobSupport._state
- when {
- state === op -> return null // already in progress
- state is OpDescriptor -> state.perform(this@JobSupport) // help
- state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
- if (STATE.compareAndSet(this@JobSupport, state, op)) return null // success
+ // it is just like start(), but support idempotent start
+ public fun trySelect(idempotent: Any?): Boolean {
+ if (idempotent == null) return start() // non idempotent -- use plain start
+ check(idempotent !is OpDescriptor) { "cannot use OpDescriptor as idempotent marker"}
+ while (true) { // lock-free loop on state
+ val state = this.state
+ when {
+ state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+ // try to promote it to list in new state
+ STATE.compareAndSet(this, state, NodeList(active = false))
+ }
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+ val active = state.active
+ if (active === idempotent) return true // was activated with the same marker --> true
+ if (active != null) return false
+ if (NodeList.ACTIVE.compareAndSet(state, null, idempotent)) {
+ onStart()
+ return true
+ }
+ }
+ state is CompletedIdempotentStart -> { // remembers idempotent start token
+ return state.idempotentStart === idempotent
+ }
+ else -> return false
+ }
+ }
+ }
+
+ public fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
+ public fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
+
+ private inner class AtomicSelectOp(
+ @JvmField val desc: AtomicDesc,
+ @JvmField val activate: Boolean
+ ) : AtomicOp () {
+ override fun prepare(): Any? = prepareIfNotSelected() ?: desc.prepare(this)
+
+ override fun complete(affected: Any?, failure: Any?) {
+ completeSelect(failure)
+ desc.complete(this, failure)
+ }
+
+ fun prepareIfNotSelected(): Any? {
+ while (true) { // lock-free loop on state
+ val state = _state
+ when {
+ state === this@AtomicSelectOp -> return null // already in progress
+ state is OpDescriptor -> state.perform(this@JobSupport) // help
+ state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
+ if (STATE.compareAndSet(this@JobSupport, state, this@AtomicSelectOp)) return null // success
+ }
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+ val active = state._active
+ when {
+ active == null -> {
+ if (NodeList.ACTIVE.compareAndSet(state, null, this@AtomicSelectOp)) return null // success
+ }
+ active === this@AtomicSelectOp -> return null // already in progress
+ active is OpDescriptor -> active.perform(state) // help
+ else -> return ALREADY_SELECTED // active state
}
- state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- if (state.isActive) return failureMarker
- if (NodeList.ACTIVE.compareAndSet(state, null, op)) return null // success
+ }
+ else -> return ALREADY_SELECTED // not a new state
+ }
+ }
+ }
+
+ private fun completeSelect(failure: Any?) {
+ val success = failure == null
+ val state = _state
+ when {
+ state === this -> {
+ val update = if (success && activate) EmptyActive else EmptyNew
+ if (STATE.compareAndSet(this@JobSupport, this, update)) {
+ if (success) onStart()
+ }
+ }
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
+ if (state._active === this) {
+ val update = if (success && activate) NodeList.ACTIVE_STATE else null
+ if (NodeList.ACTIVE.compareAndSet(state, this, update)) {
+ if (success) onStart()
}
- else -> return failureMarker // not a new state
}
}
}
- override fun complete(op: AtomicOp, failure: Any?) {
- val success = failure == null
- val state = this@JobSupport._state
- when {
- state === op -> {
- if (STATE.compareAndSet(this@JobSupport, op, if (success) EmptyActive else EmptyNew)) {
- if (success) onStart()
- }
- }
- state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
- if (state._active === op) {
- if (NodeList.ACTIVE.compareAndSet(state, op, if (success) NodeList.ACTIVE_STATE else null)) {
- if (success) onStart()
- }
- }
- }
- }
- }
}
+ }
/**
* Override to provide the actual [start] action.
@@ -457,19 +508,16 @@
while (true) { // lock-free loop on state
val state = this.state
when {
- // EMPTY_ACTIVE state -- no completion handlers, active
- state === EmptyActive -> {
+ state === EmptyActive -> { // EMPTY_ACTIVE state -- no completion handlers, active
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
if (STATE.compareAndSet(this, state, node)) return node
}
- // EMPTY_NEW state -- no completion handlers, new
- state === EmptyNew -> {
+ state === EmptyNew -> { // EMPTY_NEW state -- no completion handlers, new
// try to promote it to list in new state
STATE.compareAndSet(this, state, NodeList(active = false))
}
- // SINGLE/SINGLE+ state -- one completion handler
- state is JobNode<*> -> {
+ state is JobNode<*> -> { // SINGLE/SINGLE+ state -- one completion handler
// try to promote it to list (SINGLE+ state)
state.addOneIfEmpty(NodeList(active = true))
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
@@ -477,13 +525,11 @@
// just attempt converting it to list if state is still the same, then continue lock-free loop
STATE.compareAndSet(this, state, list)
}
- // LIST -- a list of completion handlers (either new or active)
- state is NodeList -> {
+ state is NodeList -> { // LIST -- a list of completion handlers (either new or active)
val node = nodeCache ?: makeNode(handler).also { nodeCache = it }
if (state.addLastIf(node) { this.state === state }) return node
}
- // is inactive
- else -> {
+ else -> { // is inactive
handler((state as? CompletedExceptionally)?.exception)
return EmptyRegistration
}
@@ -529,7 +575,7 @@
final override fun cancel(cause: Throwable?): Boolean {
while (true) { // lock-free loop on state
val state = this.state as? Incomplete ?: return false // quit if already complete
- if (updateState(state, Cancelled(cause))) return true
+ if (updateState(state, Cancelled(state.idempotentStart, cause), mode = 0)) return true
}
}
@@ -543,7 +589,7 @@
/**
* Override for post-completion actions that need to do something with the state.
*/
- protected open fun afterCompletion(state: Any?) {}
+ protected open fun afterCompletion(state: Any?, mode: Int) {}
private fun makeNode(handler: CompletionHandler): JobNode<*> =
(handler as? JobNode<*>)?.also { require(it.job === this) }
@@ -557,22 +603,31 @@
*/
public interface Incomplete {
val isActive: Boolean
+ val idempotentStart: Any? // != null if this state is a descendant of trySelect(idempotent)
}
private class NodeList(
active: Boolean
) : LockFreeLinkedListHead(), Incomplete {
@Volatile
+ @JvmField
var _active: Any? = if (active) ACTIVE_STATE else null
- override val isActive: Boolean get() {
+ val active: Any? get() {
while (true) { // helper loop for atomic ops
val active = this._active
- if (active !is OpDescriptor) return active != null
+ if (active !is OpDescriptor) return active
active.perform(this)
}
}
+ override val isActive: Boolean get() = active != null
+
+ override val idempotentStart: Any? get() {
+ val active = this.active
+ return if (active === ACTIVE_STATE) null else active
+ }
+
companion object {
@JvmStatic
val ACTIVE: AtomicReferenceFieldUpdater<NodeList, Any?> =
@@ -595,13 +650,20 @@
}
}
+ public open class CompletedIdempotentStart(
+ @JvmField val idempotentStart: Any?
+ )
+
/**
* Class for a [state] of a job that had completed exceptionally, including cancellation.
*
* @param cause the exceptional completion cause. If `cause` is null, then a [CancellationException]
* if created on first get from [exception] property.
*/
- public open class CompletedExceptionally(val cause: Throwable?) {
+ public open class CompletedExceptionally(
+ idempotentStart: Any?,
+ @JvmField val cause: Throwable?
+ ) : CompletedIdempotentStart(idempotentStart) {
@Volatile
private var _exception: Throwable? = cause // materialize CancellationException on first need
@@ -618,20 +680,27 @@
/**
* A specific subclass of [CompletedExceptionally] for cancelled jobs.
*/
- public class Cancelled(cause: Throwable?) : CompletedExceptionally(cause)
+ public class Cancelled(
+ idempotentStart: Any?,
+ cause: Throwable?
+ ) : CompletedExceptionally(idempotentStart, cause)
}
+internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")
+
private val EmptyNew = Empty(false)
private val EmptyActive = Empty(true)
private class Empty(override val isActive: Boolean) : JobSupport.Incomplete {
+ override val idempotentStart: Any? get() = null
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}
internal abstract class JobNode<out J : Job>(
- val job: J
+ @JvmField val job: J
) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
final override val isActive: Boolean get() = true
+ final override val idempotentStart: Any? get() = null
// if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
// directly without wrapping
final override fun unregister() = (job as JobSupport).removeNode(this)
@@ -640,7 +709,7 @@
private class InvokeOnCompletion(
job: Job,
- val handler: CompletionHandler
+ @JvmField val handler: CompletionHandler
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = handler.invoke(reason)
override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
@@ -648,15 +717,15 @@
private class ResumeOnCompletion(
job: Job,
- val continuation: Continuation<Unit>
+ @JvmField val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
}
-private class UnregisterOnCompletion(
+internal class UnregisterOnCompletion(
job: Job,
- val registration: Job.Registration
+ @JvmField val registration: Job.Registration
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) = registration.unregister()
override fun toString(): String = "UnregisterOnCompletion[$registration]"
@@ -664,15 +733,23 @@
private class CancelOnCompletion(
parentJob: Job,
- val subordinateJob: Job
+ @JvmField val subordinateJob: Job
) : JobNode<Job>(parentJob) {
override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
}
+private class ParentOnCompletion(
+ parentJob: Job,
+ @JvmField val subordinateJob: JobSupport
+) : JobNode<Job>(parentJob) {
+ override fun invoke(reason: Throwable?) { subordinateJob.onParentCompletion(reason) }
+ override fun toString(): String = "ParentOnCompletion[$subordinateJob]"
+}
+
private class CancelFutureOnCompletion(
job: Job,
- val future: Future<*>
+ @JvmField val future: Future<*>
) : JobNode<Job>(job) {
override fun invoke(reason: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
index f9485b0..014f555 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -16,6 +16,9 @@
package kotlinx.coroutines.experimental
+import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+
/**
* Yields a thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
* If the coroutine dispatcher does not have its own thread pool (like [Unconfined] dispatcher) then this
@@ -24,6 +27,12 @@
* If the [Job] of the current coroutine is completed when this suspending function is invoked or while
* this function is waiting for dispatching, it resumes with [CancellationException].
*/
-suspend fun yield(): Unit = suspendCancellableCoroutine sc@ { cont ->
- (cont as CancellableContinuationImpl).resumeYield(Unit)
+suspend fun yield(): Unit = suspendCoroutineOrReturn sc@ { cont ->
+ val context = cont.context
+ val job = context[Job]
+ if (job != null && job.isCompleted) throw job.getCompletionException()
+ if (cont !is DispatchedContinuation<Unit>) return@sc Unit
+ if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
+ cont.dispatchYield(job, Unit)
+ COROUTINE_SUSPENDED
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index a335f0e..7ae65f4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -16,14 +16,16 @@
package kotlinx.coroutines.experimental.channels
-import kotlinx.coroutines.experimental.CancellableContinuation
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
-import kotlinx.coroutines.experimental.removeOnCancel
-import kotlinx.coroutines.experimental.suspendCancellableCoroutine
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.internal.*
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlin.coroutines.experimental.startCoroutine
/**
* Abstract channel. It is a base class for buffered and unbuffered channels.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
*/
public abstract class AbstractChannel<E> : Channel<E> {
private val queue = LockFreeLinkedListHead()
@@ -52,11 +54,22 @@
protected abstract fun offerInternal(element: E): Any
/**
+ * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ */
+ protected abstract fun offerSelectInternal(element: E, select: SelectInstance<*>): Any
+
+ /**
* Tries to remove element from buffer or from queued sender.
- * Return type is `E | POLL_EMPTY | Closed`
+ * Return type is `E | POLL_FAILED | Closed`
*/
protected abstract fun pollInternal(): Any?
+ /**
+ * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ */
+ protected abstract fun pollSelectInternal(select: SelectInstance<*>): Any?
// ------ state functions for concrete implementations ------
@@ -68,7 +81,7 @@
/**
* Returns non-null closed token if it is last in the queue.
*/
- protected val closedForSend: Any? get() = queue.prev as? Closed<*>
+ protected val closedForSend: ReceiveOrClosed<*>? get() = queue.prev as? Closed<*>
// ------ SendChannel ------
@@ -86,13 +99,14 @@
val result = offerInternal(element)
return when {
result === OFFER_SUCCESS -> true
+ result === OFFER_FAILED -> false
result is Closed<*> -> throw result.sendException
- else -> false
+ else -> error("offerInternal returned $result")
}
}
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutine(true) sc@ { cont ->
- val send = SendElement(cont, element)
+ val send = SendElement(element, cont)
loop@ while (true) {
if (enqueueSend(send)) {
cont.initCancellability() // make it properly cancellable
@@ -106,10 +120,12 @@
cont.resume(Unit)
return@sc
}
+ result === OFFER_FAILED -> continue@loop
result is Closed<*> -> {
cont.resumeWithException(result.sendException)
return@sc
}
+ else -> error("offerInternal returned $result")
}
}
}
@@ -147,7 +163,82 @@
* Retrieves first receiving waiter from the queue or returns closed token.
*/
protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
- queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>> { it is Closed<*> }
+ queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
+
+ // ------ registerSelectSend ------
+
+ protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
+
+ protected class TryOfferDesc<E>(
+ @JvmField val element: E,
+ queue: LockFreeLinkedListHead
+ ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
+ @JvmField var resumeToken: Any? = null
+
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
+ if (affected is Closed<*>) return affected
+ return null
+ }
+
+ override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
+ val token = node.tryResumeReceive(element, idempotent = this) ?: return false
+ resumeToken = token
+ return true
+ }
+ }
+
+ private inner class TryEnqueueSendDesc<E, R>(
+ element: E,
+ select: SelectInstance<R>,
+ block: suspend () -> R
+ ) : AddLastDesc(queue, SendSelect(element, select, block)) {
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is ReceiveOrClosed<*>) {
+ return affected as? Closed<*> ?: ENQUEUE_FAILED
+ }
+ return null
+ }
+
+ override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+ if (!isBufferFull) return ENQUEUE_FAILED
+ return super.onPrepare(affected, next)
+ }
+
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+ super.finishOnSuccess(affected, next)
+ // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+ (node as SendSelect<*>).removeOnSelectCompletion()
+ }
+ }
+
+ override fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isFull) {
+ val enqueueOp = TryEnqueueSendDesc(element, select, block)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ enqueueResult is Closed<*> -> throw enqueueResult.sendException
+ else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
+ }
+ } else {
+ val offerResult = offerSelectInternal(element, select)
+ when {
+ offerResult === ALREADY_SELECTED -> return
+ offerResult === OFFER_FAILED -> {} // retry
+ offerResult === OFFER_SUCCESS -> {
+ block.startUndispatchedCoroutine(select.completion)
+ return
+ }
+ offerResult is Closed<*> -> throw offerResult.sendException
+ else -> error("offerSelectInternal returned $offerResult")
+ }
+ }
+ }
+ }
// ------ ReceiveChannel ------
@@ -158,7 +249,7 @@
public final override suspend fun receive(): E {
// fast path -- try poll non-blocking
val result = pollInternal()
- if (result !== POLL_EMPTY) return receiveResult(result)
+ if (result !== POLL_FAILED) return receiveResult(result)
// slow-path does suspend
return receiveSuspend()
}
@@ -171,7 +262,7 @@
@Suppress("UNCHECKED_CAST")
private suspend fun receiveSuspend(): E = suspendCancellableCoroutine(true) sc@ { cont ->
- val receive = ReceiveNonNull(cont)
+ val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
@@ -184,7 +275,7 @@
cont.resumeWithException(result.receiveException)
return@sc
}
- if (result !== POLL_EMPTY) {
+ if (result !== POLL_FAILED) {
cont.resume(result as E)
return@sc
}
@@ -203,7 +294,7 @@
public final override suspend fun receiveOrNull(): E? {
// fast path -- try poll non-blocking
val result = pollInternal()
- if (result !== POLL_EMPTY) return receiveOrNullResult(result)
+ if (result !== POLL_FAILED) return receiveOrNullResult(result)
// slow-path does suspend
return receiveOrNullSuspend()
}
@@ -211,7 +302,7 @@
@Suppress("UNCHECKED_CAST")
private fun receiveOrNullResult(result: Any?): E? {
if (result is Closed<*>) {
- if (result.closeCause != null) throw result.receiveException
+ if (result.closeCause != null) throw result.closeCause
return null
}
return result as E
@@ -219,7 +310,7 @@
@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrNullSuspend(): E? = suspendCancellableCoroutine(true) sc@ { cont ->
- val receive = ReceiveOrNull(cont)
+ val receive = ReceiveElement(cont, nullOnClose = true)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
@@ -232,10 +323,10 @@
if (result.closeCause == null)
cont.resume(null)
else
- cont.resumeWithException(result.receiveException)
+ cont.resumeWithException(result.closeCause)
return@sc
}
- if (result !== POLL_EMPTY) {
+ if (result !== POLL_FAILED) {
cont.resume(result as E)
return@sc
}
@@ -245,7 +336,7 @@
@Suppress("UNCHECKED_CAST")
public final override fun poll(): E? {
val result = pollInternal()
- return if (result === POLL_EMPTY) null else receiveOrNullResult(result)
+ return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}
public final override fun iterator(): ChannelIterator<E> = Iterator(this)
@@ -256,27 +347,139 @@
protected fun takeFirstSendOrPeekClosed(): Send? =
queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
- protected companion object {
- const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+ // ------ registerSelectReceive ------
- val OFFER_SUCCESS: Any = Marker("OFFER_SUCCESS")
- val OFFER_FAILED: Any = Marker("OFFER_FAILED")
+ protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
- val POLL_EMPTY: Any = Marker("POLL_EMPTY")
+ protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
+ @JvmField var resumeToken: Any? = null
+ @JvmField var pollResult: E? = null
- fun isClosed(result: Any?): Boolean = result is Closed<*>
- }
-
- // for debugging
- private class Marker(val string: String) {
- override fun toString(): String = string
- }
-
- private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
- cont.invokeOnCompletion {
- if (cont.isCancelled && receive.remove())
- onCancelledReceive()
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is Closed<*>) return affected
+ if (affected !is Send) return POLL_FAILED
+ return null
}
+
+ @Suppress("UNCHECKED_CAST")
+ override fun validatePrepared(node: Send): Boolean {
+ val token = node.tryResumeSend(this) ?: return false
+ resumeToken = token
+ pollResult = node.pollResult as E
+ return true
+ }
+ }
+
+ private inner class TryEnqueueReceiveDesc<E, R>(
+ select: SelectInstance<R>,
+ block: suspend (E?) -> R,
+ nullOnClose: Boolean
+ ) : AddLastDesc(queue, ReceiveSelect(select, block, nullOnClose)) {
+ override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
+ if (affected is Send) return ENQUEUE_FAILED
+ return null
+ }
+
+ override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
+ if (!isBufferEmpty) return ENQUEUE_FAILED
+ return super.onPrepare(affected, next)
+ }
+
+ override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
+ super.finishOnSuccess(affected, next)
+ // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
+ (node as ReceiveSelect<*, *>).removeOnSelectCompletion()
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isEmpty) {
+ val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+ }
+ } else {
+ val pollResult = pollSelectInternal(select)
+ when {
+ pollResult === ALREADY_SELECTED -> return
+ pollResult === POLL_FAILED -> {} // retry
+ pollResult is Closed<*> -> throw pollResult.receiveException
+ else -> {
+ block.startUndispatchedCoroutine(pollResult as E, select.completion)
+ return
+ }
+ }
+ }
+ }
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ while (true) {
+ if (select.isSelected) return
+ if (isEmpty) {
+ val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
+ val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ when {
+ enqueueResult === ALREADY_SELECTED -> return
+ enqueueResult === ENQUEUE_FAILED -> {} // retry
+ else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
+ }
+ } else {
+ val pollResult = pollSelectInternal(select)
+ when {
+ pollResult === ALREADY_SELECTED -> return
+ pollResult === POLL_FAILED -> {} // retry
+ pollResult is Closed<*> -> {
+ if (pollResult.closeCause == null) {
+ if (select.trySelect(idempotent = null))
+ block.startUndispatchedCoroutine(null, select.completion)
+ return
+ } else
+ throw pollResult.closeCause
+ }
+ else -> {
+ // selected successfully
+ block.startUndispatchedCoroutine(pollResult as E, select.completion)
+ return
+ }
+ }
+ }
+ }
+ }
+
+ // ------ protected ------
+
+ protected companion object {
+ private const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+ @JvmStatic
+ val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
+ @JvmStatic
+ val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
+
+ @JvmStatic
+ val POLL_FAILED: Any = Symbol("POLL_FAILED")
+
+ @JvmStatic
+ val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
+
+ @JvmStatic
+ private val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
+ @JvmStatic
+ private val NULL_VALUE: Any = Symbol("NULL_VALUE")
+
+ @JvmStatic
+ private val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
+
+ @JvmStatic
+ fun isClosed(result: Any?): Boolean = result is Closed<*>
}
/**
@@ -289,15 +492,24 @@
*/
protected open fun onCancelledReceive() {}
+ // ------ private ------
+
+ private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
+ cont.invokeOnCompletion {
+ if (cont.isCancelled && receive.remove())
+ onCancelledReceive()
+ }
+ }
+
private class Iterator<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
- var result: Any? = POLL_EMPTY // E | POLL_CLOSED | POLL_EMPTY
+ var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
suspend override fun hasNext(): Boolean {
// check for repeated hasNext
- if (result !== POLL_EMPTY) return hasNextResult(result)
+ if (result !== POLL_FAILED) return hasNextResult(result)
// fast path -- try poll non-blocking
result = channel.pollInternal()
- if (result !== POLL_EMPTY) return hasNextResult(result)
+ if (result !== POLL_FAILED) return hasNextResult(result)
// slow-path does suspend
return hasNextSuspend()
}
@@ -328,7 +540,7 @@
cont.resumeWithException(result.receiveException)
return@sc
}
- if (result !== POLL_EMPTY) {
+ if (result !== POLL_FAILED) {
cont.resume(true)
return@sc
}
@@ -339,8 +551,8 @@
suspend override fun next(): E {
val result = this.result
if (result is Closed<*>) throw result.receiveException
- if (result !== POLL_EMPTY) {
- this.result = POLL_EMPTY
+ if (result !== POLL_FAILED) {
+ this.result = POLL_FAILED
return result as E
}
// rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
@@ -353,7 +565,7 @@
*/
protected interface Send {
val pollResult: Any? // E | Closed
- fun tryResumeSend(): Any?
+ fun tryResumeSend(idempotent: Any?): Any?
fun completeResumeSend(token: Any)
}
@@ -362,31 +574,59 @@
*/
protected interface ReceiveOrClosed<in E> {
val offerResult: Any // OFFER_SUCCESS | Closed
- fun tryResumeReceive(value: E): Any?
+ fun tryResumeReceive(value: E, idempotent: Any?): Any?
fun completeResumeReceive(token: Any)
}
@Suppress("UNCHECKED_CAST")
private class SendElement(
- val cont: CancellableContinuation<Unit>,
- override val pollResult: Any?
+ override val pollResult: Any?,
+ @JvmField val cont: CancellableContinuation<Unit>
) : LockFreeLinkedListNode(), Send {
- override fun tryResumeSend(): Any? = cont.tryResume(Unit)
+ override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun completeResumeSend(token: Any) = cont.completeResume(token)
+ override fun toString(): String = "SendElement($pollResult)[$cont]"
}
- private class Closed<in E>(
- val closeCause: Throwable?
+ private class SendSelect<R>(
+ override val pollResult: Any?,
+ @JvmField val select: SelectInstance<R>,
+ @JvmField val block: suspend () -> R
+ ) : LockFreeLinkedListNode(), Send, CompletionHandler {
+ override fun tryResumeSend(idempotent: Any?): Any? =
+ if (select.trySelect(idempotent)) SELECT_STARTED else null
+
+ override fun completeResumeSend(token: Any) {
+ check(token === SELECT_STARTED)
+ block.startCoroutine(select.completion)
+ }
+
+ fun removeOnSelectCompletion() {
+ select.invokeOnCompletion(this)
+ }
+
+ override fun invoke(cause: Throwable?) {
+ remove()
+ }
+
+ override fun toString(): String = "SendSelect($pollResult)[$select]"
+ }
+
+ /**
+ * Represents closed channel.
+ */
+ protected class Closed<in E>(
+ @JvmField val closeCause: Throwable?
) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
@Volatile
- var _sendException: Throwable? = null
+ private var _sendException: Throwable? = null
val sendException: Throwable get() = _sendException ?:
(closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE))
.also { _sendException = it }
@Volatile
- var _receiveException: Throwable? = null
+ private var _receiveException: Throwable? = null
val receiveException: Throwable get() = _receiveException ?:
(closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE))
@@ -394,10 +634,11 @@
override val offerResult get() = this
override val pollResult get() = this
- override fun tryResumeSend(): Boolean = true
- override fun completeResumeSend(token: Any) {}
- override fun tryResumeReceive(value: E): Any? = throw sendException
+ override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
+ override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = throw sendException
override fun completeResumeReceive(token: Any) = throw sendException
+ override fun toString(): String = "Closed[$closeCause]"
}
private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
@@ -405,34 +646,51 @@
abstract fun resumeReceiveClosed(closed: Closed<*>)
}
- private class ReceiveNonNull<in E>(val cont: CancellableContinuation<E>) : Receive<E>() {
- override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
- override fun completeResumeReceive(token: Any) = cont.completeResume(token)
- override fun resumeReceiveClosed(closed: Closed<*>) = cont.resumeWithException(closed.receiveException)
- }
-
- private class ReceiveOrNull<in E>(val cont: CancellableContinuation<E?>) : Receive<E>() {
- override fun tryResumeReceive(value: E): Any? = cont.tryResume(value)
+ private class ReceiveElement<in E>(
+ @JvmField val cont: CancellableContinuation<E?>,
+ @JvmField val nullOnClose: Boolean
+ ) : Receive<E>() {
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
override fun resumeReceiveClosed(closed: Closed<*>) {
- if (closed.closeCause == null)
+ if (closed.closeCause == null && nullOnClose)
cont.resume(null)
else
cont.resumeWithException(closed.receiveException)
}
+ override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
}
+ private class IdempotentTokenValue<out E>(
+ @JvmField val token: Any,
+ @JvmField val value: E
+ )
+
private class ReceiveHasNext<E>(
- val iterator: Iterator<E>,
- val cont: CancellableContinuation<Boolean>
+ @JvmField val iterator: Iterator<E>,
+ @JvmField val cont: CancellableContinuation<Boolean>
) : Receive<E>() {
- override fun tryResumeReceive(value: E): Any? {
- val token = cont.tryResume(true)
- if (token != null) iterator.result = value
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
+ val token = cont.tryResume(true, idempotent)
+ if (token != null) {
+ /*
+ When idempotent != null this invocation can be stale and we cannot directly update iterator.result
+ Instead, we save both token & result into a temporary IdempotentTokenValue object and
+ set iterator result only in completeResumeReceive that is going to be invoked just once
+ */
+ if (idempotent != null) return IdempotentTokenValue(token, value)
+ iterator.result = value
+ }
return token
}
- override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+ override fun completeResumeReceive(token: Any) {
+ if (token is IdempotentTokenValue<*>) {
+ iterator.result = token.value
+ cont.completeResume(token.token)
+ } else
+ cont.completeResume(token)
+ }
override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null)
@@ -444,5 +702,40 @@
cont.completeResume(token)
}
}
+ override fun toString(): String = "ReceiveHasNext[$cont]"
+ }
+
+ private class ReceiveSelect<R, in E>(
+ @JvmField val select: SelectInstance<R>,
+ @JvmField val block: suspend (E?) -> R,
+ @JvmField val nullOnClose: Boolean
+ ) : Receive<E>(), CompletionHandler {
+ override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
+ if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
+
+ @Suppress("UNCHECKED_CAST")
+ override fun completeResumeReceive(token: Any) {
+ val value: E = (if (token === NULL_VALUE) null else token) as E
+ block.startCoroutine(value, select.completion)
+ }
+
+ override fun resumeReceiveClosed(closed: Closed<*>) {
+ if (select.trySelect(idempotent = null)) {
+ if (closed.closeCause == null && nullOnClose) {
+ block.startCoroutine(null, select.completion)
+ } else
+ select.resumeSelectWithException(closed.receiveException)
+ }
+ }
+
+ fun removeOnSelectCompletion() {
+ select.invokeOnCompletion(this)
+ }
+
+ override fun invoke(cause: Throwable?) {
+ remove()
+ }
+
+ override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
index b5265fa..c75f1bb 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.ALREADY_SELECTED
+import kotlinx.coroutines.experimental.selects.SelectInstance
import java.util.concurrent.locks.ReentrantLock
/**
@@ -53,8 +55,8 @@
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
override fun offerInternal(element: E): Any {
- var token: Any? = null
var receive: ReceiveOrClosed<E>? = null
+ var token: Any? = null
locked {
val size = this.size
closedForSend?.let { return it }
@@ -63,9 +65,13 @@
this.size = size + 1 // update size before checking queue (!!!)
// check for receivers that were waiting on empty queue
if (size == 0) {
- while (true) {
- receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
- token = receive!!.tryResumeReceive(element)
+ loop@ while (true) {
+ receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
+ if (receive is Closed) {
+ this.size = size // restore size
+ return receive!!
+ }
+ token = receive!!.tryResumeReceive(element, idempotent = null)
if (token != null) {
this.size = size // restore size
return@locked
@@ -83,31 +89,79 @@
return receive!!.offerResult
}
- // result is `E | POLL_EMPTY | Closed`
- override fun pollInternal(): Any? {
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ var receive: ReceiveOrClosed<E>? = null
var token: Any? = null
+ locked {
+ val size = this.size
+ closedForSend?.let { return it }
+ if (size < capacity) {
+ // tentatively put element to buffer
+ this.size = size + 1 // update size before checking queue (!!!)
+ // check for receivers that were waiting on empty queue
+ if (size == 0) {
+ loop@ while (true) {
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ when {
+ failure == null -> { // offered successfully
+ this.size = size // restore size
+ receive = offerOp.result
+ token = offerOp.resumeToken
+ check(token != null)
+ return@locked
+ }
+ failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
+ failure === ALREADY_SELECTED || failure is Closed<*> -> {
+ this.size = size // restore size
+ return failure
+ }
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ }
+ }
+ }
+ // let's try to select sending this element to buffer
+ if (!select.trySelect(null)) {
+ this.size = size // restore size
+ return ALREADY_SELECTED
+ }
+ buffer[(head + size) % capacity] = element // actually queue element
+ return OFFER_SUCCESS
+ }
+ // size == capacity: full
+ return OFFER_FAILED
+ }
+ // breaks here if offer meets receiver
+ receive!!.completeResumeReceive(token!!)
+ return receive!!.offerResult
+ }
+
+ // result is `E | POLL_FAILED | Closed`
+ override fun pollInternal(): Any? {
var send: Send? = null
+ var token: Any? = null
var result: Any? = null
locked {
val size = this.size
- if (size == 0) return closedForSend ?: POLL_EMPTY
+ if (size == 0) return closedForSend ?: POLL_FAILED
// size > 0: not empty -- retrieve element
result = buffer[head]
buffer[head] = null
this.size = size - 1 // update size before checking queue (!!!)
// check for senders that were waiting on full queue
- var replacement: Any? = POLL_EMPTY
+ var replacement: Any? = POLL_FAILED
if (size == capacity) {
- while (true) {
+ loop@ while (true) {
send = takeFirstSendOrPeekClosed() ?: break
- token = send!!.tryResumeSend()
+ token = send!!.tryResumeSend(idempotent = null)
if (token != null) {
replacement = send!!.pollResult
- break
+ break@loop
}
}
}
- if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
+ if (replacement !== POLL_FAILED && !isClosed(replacement)) {
this.size = size // restore size
buffer[(head + size) % capacity] = replacement
}
@@ -118,4 +172,65 @@
send!!.completeResumeSend(token!!)
return result
}
+
+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ var send: Send? = null
+ var token: Any? = null
+ var result: Any? = null
+ locked {
+ val size = this.size
+ if (size == 0) return closedForSend ?: POLL_FAILED
+ // size > 0: not empty -- retrieve element
+ result = buffer[head]
+ buffer[head] = null
+ this.size = size - 1 // update size before checking queue (!!!)
+ // check for senders that were waiting on full queue
+ var replacement: Any? = POLL_FAILED
+ if (size == capacity) {
+ loop@ while (true) {
+ val pollOp = describeTryPoll()
+ val failure = select.performAtomicTrySelect(pollOp)
+ when {
+ failure == null -> { // polled successfully
+ send = pollOp.result
+ token = pollOp.resumeToken
+ check(token != null)
+ replacement = send!!.pollResult
+ break@loop
+ }
+ failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
+ failure === ALREADY_SELECTED -> {
+ this.size = size // restore size
+ buffer[head] = result // restore head
+ return failure
+ }
+ failure is Closed<*> -> {
+ send = failure
+ token = failure.tryResumeSend(idempotent = null)
+ replacement = failure
+ break@loop
+ }
+ else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
+ }
+ }
+ }
+ if (replacement !== POLL_FAILED && !isClosed(replacement)) {
+ this.size = size // restore size
+ buffer[(head + size) % capacity] = replacement
+ } else {
+ // failed to poll or is already closed --> let's try to select receiving this element from buffer
+ if (!select.trySelect(null)) {
+ this.size = size // restore size
+ buffer[head] = result // restore head
+ return ALREADY_SELECTED
+ }
+ }
+ head = (head + 1) % capacity
+ }
+ // complete send the we're taken replacement from
+ if (token != null)
+ send!!.completeResumeSend(token!!)
+ return result
+ }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
index fc97bb9..f46c218 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt
@@ -19,6 +19,9 @@
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.SelectInstance
+import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.experimental.yield
/**
@@ -40,7 +43,7 @@
public val isFull: Boolean
/**
- * Adds [element] into to this queue, suspending the caller while this queue [isFull],
+ * Adds [element] into to this channel, suspending the caller while this channel [isFull],
* or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
* It throws the original [close] cause exception if the channel has _failed_.
*
@@ -51,6 +54,9 @@
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onSend][SelectBuilder.onSend] clause.
+ * Use [offer] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
@@ -63,6 +69,12 @@
public fun offer(element: E): Boolean
/**
+ * Registers [onSend][SelectBuilder.onSend] select clause.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend () -> R)
+
+ /**
* Closes this channel with an optional exceptional [cause].
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
@@ -109,6 +121,9 @@
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onReceive][SelectBuilder.onReceive] clause.
+ * Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
@@ -124,6 +139,9 @@
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This function can be used in [select] invocation with [onReceiveOrNull][SelectBuilder.onReceiveOrNull] clause.
+ * Use [poll] to try receiving from this channel without waiting.
*/
public suspend fun receiveOrNull(): E?
@@ -140,6 +158,18 @@
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
+
+ /**
+ * Registers [onReceive][SelectBuilder.onReceive] select clause.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R)
+
+ /**
+ * Registers [onReceiveOrNull][SelectBuilder.onReceiveOrNull] select clause.
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ public fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R)
}
/**
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index ef7b97d..24c6855 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -102,10 +102,10 @@
produce(context, capacity, block)
private class ProducerCoroutine<E>(
- context: CoroutineContext,
+ override val parentContext: CoroutineContext,
override val channel: Channel<E>
-) : AbstractCoroutine<Unit>(context, active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
- override fun afterCompletion(state: Any?) {
+) : AbstractCoroutine<Unit>(active = true), ProducerScope<E>, ProducerJob<E>, Channel<E> by channel {
+ override fun afterCompletion(state: Any?, mode: Int) {
val cause = (state as? CompletedExceptionally)?.cause
if (!channel.close(cause) && cause != null)
handleCoroutineException(context, cause)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
index e1a0ebb..f0fe82e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental.channels
+import kotlinx.coroutines.experimental.selects.SelectInstance
+
/**
* Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
* to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
@@ -33,7 +35,7 @@
protected final override fun offerInternal(element: E): Any {
while (true) {
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
- val token = receive.tryResumeReceive(element)
+ val token = receive.tryResumeReceive(element, idempotent = null)
if (token != null) {
receive.completeResumeReceive(token)
return receive.offerResult
@@ -41,16 +43,38 @@
}
}
- // result is `E | POLL_EMPTY | Closed`
+ // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
+ protected final override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
+ // offer atomically with select
+ val offerOp = describeTryOffer(element)
+ val failure = select.performAtomicTrySelect(offerOp)
+ if (failure != null) return failure
+ val receive = offerOp.result
+ receive.completeResumeReceive(offerOp.resumeToken!!)
+ return receive.offerResult
+ }
+
+ // result is `E | POLL_FAILED | Closed`
protected final override fun pollInternal(): Any? {
while (true) {
- val send = takeFirstSendOrPeekClosed() ?: return POLL_EMPTY
- val token = send.tryResumeSend()
+ val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
+ val token = send.tryResumeSend(idempotent = null)
if (token != null) {
send.completeResumeSend(token)
return send.pollResult
}
}
}
+
+ // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
+ protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
+ // poll atomically with select
+ val pollOp = describeTryPoll()
+ val failure = select.performAtomicTrySelect(pollOp)
+ if (failure != null) return failure
+ val send = pollOp.result
+ send.completeResumeSend(pollOp.resumeToken!!)
+ return pollOp.pollResult
+ }
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
index 5f5ace0..a258d92 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt
@@ -36,25 +36,29 @@
private val CONSENSUS: AtomicReferenceFieldUpdater<AtomicOp, Any?> =
AtomicReferenceFieldUpdater.newUpdater(AtomicOp::class.java, Any::class.java, "_consensus")
+ @JvmStatic
private val UNDECIDED: Any = Symbol("UNDECIDED")
}
val isDecided: Boolean get() = _consensus !== UNDECIDED
+ fun tryDecide(decision: Any?): Boolean {
+ check(decision !== UNDECIDED)
+ return CONSENSUS.compareAndSet(this, UNDECIDED, decision)
+ }
+
+ private fun decide(decision: Any?): Any? = if (tryDecide(decision)) decision else _consensus
+
abstract fun prepare(): Any? // `null` if Ok, or failure reason
+
abstract fun complete(affected: Any?, failure: Any?) // failure != null if failed to prepare op
// returns `null` on success
final override fun perform(affected: Any?): Any? {
// make decision on status
- var decision: Any?
- while (true) {
- decision = this._consensus
- if (decision !== UNDECIDED) break
- decision = prepare()
- check(decision !== UNDECIDED)
- if (CONSENSUS.compareAndSet(this, UNDECIDED, decision)) break
- }
+ var decision = this._consensus
+ if (decision === UNDECIDED)
+ decision = decide(prepare())
complete(affected, decision)
return decision
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
index 10f95ee..be23aa3 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt
@@ -46,6 +46,11 @@
public typealias RemoveFirstDesc<T> = LockFreeLinkedListNode.RemoveFirstDesc<T>
/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc
+
+/**
* Doubly-linked concurrent list node with remove support.
* Based on paper
* ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf)
@@ -85,8 +90,10 @@
_removedRef ?: Removed(this).also { REMOVED_REF.lazySet(this, it) }
@PublishedApi
- internal abstract class CondAddOp(val newNode: Node) : AtomicOp() {
- lateinit var oldNext: Node
+ internal abstract class CondAddOp(
+ @JvmField val newNode: Node
+ ) : AtomicOp() {
+ @JvmField var oldNext: Node? = null
override fun complete(affected: Any?, failure: Any?) {
affected as Node // type assertion
@@ -94,7 +101,7 @@
val update = if (success) newNode else oldNext
if (NEXT.compareAndSet(affected, this, update)) {
// only the thread the makes this update actually finishes add operation
- if (success) newNode.finishAdd(oldNext)
+ if (success) newNode.finishAdd(oldNext!!)
}
}
}
@@ -121,7 +128,7 @@
while (true) {
val prev = this._prev as Node // this sentinel node is never removed
if (prev.next === this) return prev
- helpInsert(prev)
+ helpInsert(prev, null)
}
}
@@ -153,6 +160,8 @@
}
}
+ public fun describeAddLast(node: Node): AddLastDesc = AddLastDesc(this, node)
+
/**
* Adds last item to this list atomically if the [condition] is true.
*/
@@ -224,7 +233,8 @@
val next = this.next
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
check(next !== this) // sanity check -- can be true for sentinel nodes only, but they are never removed
- if (NEXT.compareAndSet(this, next, (next as Node).removed())) {
+ val removed = (next as Node).removed()
+ if (NEXT.compareAndSet(this, next, removed)) {
// was removed successfully (linearized remove) -- fixup the list
finishRemove(next)
return true
@@ -239,11 +249,11 @@
override var originalNext: Node? = null
override fun failure(affected: Node, next: Any): Any? =
if (next is Removed) ALREADY_REMOVED else null
- override fun onPrepare(affected: Node, next: Node): Boolean {
+ override fun onPrepare(affected: Node, next: Node): Any? {
originalNext = next
- return true
+ return null // always success
}
- override fun updatedNext(next: Node) = next.removed()
+ override fun updatedNext(affected: Node, next: Node) = next.removed()
override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
}
}
@@ -283,17 +293,62 @@
// ------ multi-word atomic operations helpers ------
+ public open class AddLastDesc(val queue: Node, val node: Node) : AbstractAtomicDesc() {
+ init {
+ // require freshly allocated node here
+ check(node._next === node && node._prev === node)
+ }
+
+ final override fun takeAffectedNode(op: OpDescriptor): Node {
+ while (true) {
+ val prev = queue._prev as Node // this sentinel node is never removed
+ val next = prev._next
+ if (next === queue) return prev // all is good -> linked properly
+ if (next === op) return prev // all is good -> our operation descriptor is already there
+ if (next is OpDescriptor) { // some other operation descriptor -> help & retry
+ next.perform(prev)
+ continue
+ }
+ // linked improperly -- help insert
+ queue.helpInsert(prev, op)
+ }
+ }
+
+ final override var affectedNode: Node? = null
+ final override val originalNext: Node? get() = queue
+
+ override fun retry(affected: Node, next: Any): Boolean = next !== queue
+
+ override fun onPrepare(affected: Node, next: Node): Any? {
+ affectedNode = affected
+ return null // always success
+ }
+
+ override fun updatedNext(affected: Node, next: Node): Any {
+ // it is invoked only on successfully completion of operation, but this invocation can be stale,
+ // so we must use CAS to set both prev & next pointers
+ PREV.compareAndSet(node, node, affected)
+ NEXT.compareAndSet(node, node, queue)
+ return node
+ }
+
+ override fun finishOnSuccess(affected: Node, next: Node) {
+ node.finishAdd(queue)
+ }
+ }
+
public open class RemoveFirstDesc<T>(val queue: Node) : AbstractAtomicDesc() {
@Suppress("UNCHECKED_CAST")
public val result: T get() = affectedNode!! as T
- final override fun takeAffectedNode(): Node = queue.next as Node
+ final override fun takeAffectedNode(op: OpDescriptor): Node = queue.next as Node
final override var affectedNode: Node? = null
final override var originalNext: Node? = null
// check node predicates here, must signal failure if affect is not of type T
protected override fun failure(affected: Node, next: Any): Any? =
if (affected === queue) LIST_EMPTY else null
+
// validate the resulting node (return false if it should be deleted)
protected open fun validatePrepared(node: T): Boolean = true // false means remove node & retry
@@ -302,38 +357,55 @@
affected.helpDelete() // must help delete, or loose lock-freedom
return true
}
+
@Suppress("UNCHECKED_CAST")
- final override fun onPrepare(affected: Node, next: Node): Boolean {
+ final override fun onPrepare(affected: Node, next: Node): Any? {
check(affected !is LockFreeLinkedListHead)
- if (!validatePrepared(affected as T)) return false
+ if (!validatePrepared(affected as T)) return REMOVE_PREPARED
affectedNode = affected
originalNext = next
- return true
+ return null // ok
}
- final override fun updatedNext(next: Node): Any = next.removed()
+
+ final override fun updatedNext(affected: Node, next: Node): Any = next.removed()
final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next)
}
public abstract class AbstractAtomicDesc : AtomicDesc() {
protected abstract val affectedNode: Node?
protected abstract val originalNext: Node?
- protected open fun takeAffectedNode(): Node = affectedNode!!
+ protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
- protected abstract fun onPrepare(affected: Node, next: Node): Boolean // false means: remove node & retry
- protected abstract fun updatedNext(next: Node): Any
+ protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
+ protected abstract fun updatedNext(affected: Node, next: Node): Any
protected abstract fun finishOnSuccess(affected: Node, next: Node)
// This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
// It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise)
private class PrepareOp(
- val next: Node,
- val op: AtomicOp,
- val desc: AbstractAtomicDesc
+ @JvmField val next: Node,
+ @JvmField val op: AtomicOp,
+ @JvmField val desc: AbstractAtomicDesc
) : OpDescriptor() {
override fun perform(affected: Any?): Any? {
affected as Node // type assertion
- if (!desc.onPrepare(affected, next)) return REMOVE_PREPARED
+ val decision = desc.onPrepare(affected, next)
+ if (decision != null) {
+ if (decision === REMOVE_PREPARED) {
+ // remove element on failure
+ val removed = next.removed()
+ if (NEXT.compareAndSet(affected, this, removed)) {
+ affected.helpDelete()
+ }
+ } else {
+ // some other failure -- mark as decided
+ op.tryDecide(decision)
+ // undo preparations
+ NEXT.compareAndSet(affected, this, next)
+ }
+ return decision
+ }
check(desc.affectedNode === affected)
check(desc.originalNext === next)
val update: Any = if (op.isDecided) next else op // restore if decision was already reached
@@ -344,7 +416,7 @@
final override fun prepare(op: AtomicOp): Any? {
while (true) { // lock free loop on next
- val affected = takeAffectedNode()
+ val affected = takeAffectedNode(op)
// read its original next pointer first
val next = affected._next
// then see if already reached consensus on overall operation
@@ -362,25 +434,20 @@
val prepareOp = PrepareOp(next as Node, op, this)
if (NEXT.compareAndSet(affected, next, prepareOp)) {
// prepared -- complete preparations
- val prepFail = prepareOp.perform(affected) ?: return null // prepared successfully
- check(prepFail === REMOVE_PREPARED) // the only way for prepare to fail
- if (NEXT.compareAndSet(affected, prepareOp, next.removed())) {
- affected.helpDelete()
- }
+ val prepFail = prepareOp.perform(affected)
+ if (prepFail === REMOVE_PREPARED) continue // retry
+ return prepFail
}
}
}
final override fun complete(op: AtomicOp, failure: Any?) {
val success = failure == null
- val update = if (success) updatedNext(originalNext!!) else originalNext
- val affectedNode = affectedNode
- if (affectedNode == null) {
- check(!success)
- return
- }
+ val affectedNode = affectedNode ?: run { check(!success); return }
+ val originalNext = this.originalNext ?: run { check(!success); return }
+ val update = if (success) updatedNext(affectedNode, originalNext) else originalNext
if (NEXT.compareAndSet(affectedNode, op, update)) {
- if (success) finishOnSuccess(affectedNode, originalNext!!)
+ if (success) finishOnSuccess(affectedNode, originalNext)
}
}
}
@@ -394,7 +461,7 @@
if (PREV.compareAndSet(next, nextPrev, this)) {
if (this.next is Removed) {
// already removed
- next.helpInsert(nextPrev as Node)
+ next.helpInsert(nextPrev as Node, null)
}
return
}
@@ -403,7 +470,7 @@
private fun finishRemove(next: Node) {
helpDelete()
- next.helpInsert(_prev.unwrap())
+ next.helpInsert(_prev.unwrap(), null)
}
private fun markPrev(): Node {
@@ -454,12 +521,17 @@
}
// fixes prev links from this node
- private fun helpInsert(_prev: Node) {
+ private fun helpInsert(_prev: Node, op: OpDescriptor?) {
var prev: Node = _prev
var last: Node? = null // will be set so that last.next === prev
while (true) {
// move the the left until first non-removed node
- val prevNext = prev.next
+ val prevNext = prev._next
+ if (prevNext === op) return // part of the same op -- don't recurse
+ if (prevNext is OpDescriptor) { // help & retry
+ prevNext.perform(prev)
+ continue
+ }
if (prevNext is Removed) {
if (last !== null) {
prev.markPrev()
@@ -490,9 +562,11 @@
check(prev === this._prev)
check(next === this._next)
}
+
+ override fun toString(): String = "${this::class.java.simpleName}@${Integer.toHexString(System.identityHashCode(this))}"
}
-private class Removed(val ref: Node) {
+private class Removed(@JvmField val ref: Node) {
override fun toString(): String = "Removed[$ref]"
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
new file mode 100644
index 0000000..dfb059c
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
+import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
+import kotlinx.coroutines.experimental.internal.AtomicDesc
+import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+
+/**
+ * Scope for [select] invocation.
+ */
+public interface SelectBuilder<in R> : CoroutineScope {
+ /**
+ * Clause for [Deferred.await] suspending function that selects the given [block] with the deferred value is
+ * resolved. The [select] invocation fails if the deferred value completes exceptionally (either fails or
+ * it cancelled).
+ */
+ public fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
+
+ /**
+ * Clause for [SendChannel.send] suspending function that selects the given [block] when the [element] is sent to
+ * the channel. The [select] invocation fails with [ClosedSendChannelException] if the channel
+ * [isClosedForSend][SendChannel.isClosedForSend] _normally_ or with the original
+ * [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
+
+ /**
+ * Clause for [ReceiveChannel.receive] suspending function that selects the given [block] with the element that
+ * is received from the channel. The [select] invocation fails with [ClosedReceiveChannelException] if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_ or with the original
+ * [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
+
+ /**
+ * Clause for [ReceiveChannel.receiveOrNull] suspending function that selects the given [block] with the element that
+ * is received from the channel or selects the given [block] with `null` if if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_. The [select] invocation fails with
+ * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ */
+ public fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R)
+}
+
+/**
+ * Internal representation of select instance. This instance is called _selected_ when
+ * the clause to execute is already picked.
+ *
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public interface SelectInstance<in R> {
+ /**
+ * Returns `true` when this [select] statement had already picked a clause to execute.
+ */
+ public val isSelected: Boolean
+
+ /**
+ * Tries to select this instance.
+ */
+ public fun trySelect(idempotent: Any?): Boolean
+
+ /**
+ * Performs action atomically with [trySelect].
+ */
+ public fun performAtomicTrySelect(desc: AtomicDesc): Any?
+
+ /**
+ * Performs action atomically when [isSelected] is `false`.
+ */
+ public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
+
+ public val completion: Continuation<R>
+
+ public fun resumeSelectWithException(exception: Throwable)
+
+ public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
+
+ public fun unregisterOnCompletion(registration: Job.Registration)
+}
+
+/**
+ * Waits for the result of multiple suspending functions simultaneously, which are specified using _clauses_
+ * in the [builder] scope of this select invocation. The caller is suspended until one of the clauses
+ * is either _selected_ or _fails_.
+ *
+ * At most one clause is *atomically* selected and its block is executed. The result of the selected clause
+ * becomes the result of the select. If any clause _fails_, then the select invocation produces the
+ * corresponding exception. No clause is selected in this case.
+ *
+ * There is no `default` clause for select expression. Instead, each selectable suspending function has the
+ * corresponding non-suspending version that can be used with a regular `when` expression to select one
+ * of the alternatives or to perform default (`else`) action if none of them can be immediately selected.
+ *
+ * | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version**
+ * | ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
+ * | [Deferred] | [await][Deferred.await] | [onAwait][SelectBuilder.onAwait] | [isCompleted][Deferred.isCompleted]
+ * | [SendChannel] | [send][SendChannel.send] | [onSend][SelectBuilder.onSend] | [offer][SendChannel.offer]
+ * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
+ * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended select is *atomic* -- when this function
+ * throws [CancellationException] it means that no clause was selected.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+public inline suspend fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
+ suspendCoroutineOrReturn { cont ->
+ val scope = SelectBuilderImpl(cont)
+ try {
+ builder(scope)
+ } catch (e: Throwable) {
+ scope.handleBuilderException(e)
+ }
+ scope.initSelectResult()
+ }
+
+/*
+ :todo: It is best to rewrite this class without the use of CancellableContinuationImpl and JobSupport infrastructure
+ This way JobSupport will not have to provide trySelect(idempotent) operation can can save some checks and bytes
+ to carry on that idempotent start token.
+ */
+@PublishedApi
+internal class SelectBuilderImpl<in R>(
+ delegate: Continuation<R>
+) : CancellableContinuationImpl<R>(delegate, active = false), SelectBuilder<R>, SelectInstance<R> {
+ @PublishedApi
+ internal fun handleBuilderException(e: Throwable) {
+ if (trySelect(idempotent = null)) {
+ val token = tryResumeWithException(e)
+ if (token != null)
+ completeResume(token)
+ else
+ handleCoroutineException(context, e)
+ }
+ }
+
+ @PublishedApi
+ internal fun initSelectResult(): Any? {
+ if (!isSelected) initCancellability()
+ return getResult()
+ }
+
+ // coroutines that are started inside this select are directly subordinate to the parent job
+ override fun createContext(): CoroutineContext = delegate.context
+
+ override fun onParentCompletion(cause: Throwable?) {
+ /*
+ Select is cancelled only when no clause was selected yet. If a clause was selected, then
+ it is the concern of the coroutine that was started by that clause to cancel on its suspension
+ points.
+ */
+ if (trySelect(null))
+ cancel(cause)
+ }
+
+ override fun defaultResumeMode(): Int = MODE_DIRECT // all resumes through completion are dispatched directly
+
+ override val completion: Continuation<R> get() {
+ check(isSelected) { "Must be selected first" }
+ return this
+ }
+
+ override fun resumeSelectWithException(exception: Throwable) {
+ check(isSelected) { "Must be selected first" }
+ resumeWithException(exception, mode = 0)
+ }
+
+ override fun <T> Deferred<T>.onAwait(block: suspend (T) -> R) {
+ registerSelectAwait(this@SelectBuilderImpl, block)
+ }
+
+ override fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R) {
+ registerSelectSend(this@SelectBuilderImpl, element, block)
+ }
+
+ override fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R) {
+ registerSelectReceive(this@SelectBuilderImpl, block)
+ }
+
+ override fun <E> ReceiveChannel<E>.onReceiveOrNull(block: suspend (E?) -> R) {
+ registerSelectReceiveOrNull(this@SelectBuilderImpl, block)
+ }
+
+ override fun unregisterOnCompletion(registration: Job.Registration) {
+ invokeOnCompletion(UnregisterOnCompletion(this, registration))
+ }
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationImplTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationImplTest.kt
new file mode 100644
index 0000000..7c5afee
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CancellableContinuationImplTest.kt
@@ -0,0 +1,84 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.Test
+import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.EmptyCoroutineContext
+import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
+
+class CancellableContinuationImplTest {
+ @Test
+ fun testIdempotentSelectResume() {
+ var resumed = false
+ val delegate = object : Continuation<String> {
+ override val context: CoroutineContext get() = EmptyCoroutineContext
+ override fun resume(value: String) {
+ check(value === "OK")
+ resumed = true
+ }
+ override fun resumeWithException(exception: Throwable) { error("Should not happen") }
+ }
+ val c = CancellableContinuationImpl<String>(delegate, false)
+ check(!c.isSelected)
+ check(!c.isActive)
+ check(c.trySelect("SELECT"))
+ check(c.isSelected)
+ check(c.isActive)
+ check(!c.start())
+ check(!c.trySelect("OTHER"))
+ check(c.trySelect("SELECT"))
+ val token = c.tryResume("OK", "RESUME") ?: error("Failed")
+ check(c.isSelected)
+ check(!c.isActive)
+ check(null == c.tryResume("FAIL"))
+ check(!c.start())
+ check(!c.trySelect("OTHER"))
+ check(c.trySelect("SELECT"))
+ check(token === c.tryResume("OK", "RESUME"))
+ check(c.getResult() === COROUTINE_SUSPENDED)
+ check(!resumed)
+ c.completeResume(token)
+ check(resumed)
+ check(c.isSelected)
+ check(!c.isActive)
+ check(null == c.tryResume("FAIL"))
+ check(!c.start())
+ check(!c.trySelect("OTHER"))
+ check(c.trySelect("SELECT"))
+ check(token === c.tryResume("OK", "RESUME"))
+ check(c.getResult() === "OK")
+ }
+
+ @Test
+ fun testIdempotentSelectCancel() {
+ var resumed = false
+ val delegate = object : Continuation<String> {
+ override val context: CoroutineContext get() = EmptyCoroutineContext
+ override fun resume(value: String) { error("Should not happen") }
+ override fun resumeWithException(exception: Throwable) {
+ check(exception is CancellationException)
+ resumed = true
+ }
+ }
+ val c = CancellableContinuationImpl<String>(delegate, false)
+ check(!c.isSelected)
+ check(!c.isActive)
+ check(c.trySelect("SELECT"))
+ check(c.isSelected)
+ check(c.isActive)
+ check(!c.start())
+ check(!c.trySelect("OTHER"))
+ check(c.trySelect("SELECT"))
+ check(c.getResult() === COROUTINE_SUSPENDED)
+ check(!resumed)
+ c.cancel()
+ check(resumed)
+ check(c.isSelected)
+ check(!c.isActive)
+ check(null == c.tryResume("FAIL"))
+ check(!c.start())
+ check(!c.trySelect("OTHER"))
+ check(c.trySelect("SELECT"))
+ check(null === c.tryResume("OK", "RESUME"))
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
index 40b3261..f955982 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
@@ -26,18 +26,28 @@
var finished = AtomicBoolean()
var error = AtomicReference<Throwable>()
+ public fun error(message: Any): Nothing {
+ val exception = IllegalStateException(message.toString())
+ error.compareAndSet(null, exception)
+ throw exception
+ }
+
+ public inline fun check(value: Boolean, lazyMessage: () -> Any): Unit {
+ if (!value) error(lazyMessage())
+ }
+
fun expect(index: Int) {
val wasIndex = actionIndex.incrementAndGet()
check(index == wasIndex) { "Expecting action index $index but it is actually $wasIndex" }
}
fun expectUnreached() {
- throw IllegalStateException("Should not be reached").also { error.compareAndSet(null, it) }
+ error("Should not be reached")
}
fun finish(index: Int) {
expect(index)
- finished.set(true)
+ check(!finished.getAndSet(true)) { "Should call 'finish(...)' at most once" }
}
@After
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
index ec20cbc..1363785 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt
@@ -17,12 +17,17 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.select
+import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.*
-import org.junit.Assert.*
+import java.util.concurrent.atomic.AtomicReference
+/**
+ * Tests cancel atomicity for channel send & receive operations, including their select versions.
+ */
@RunWith(Parameterized::class)
class ChannelAtomicCancelStressTest(val kind: TestChannelKind) {
companion object {
@@ -34,8 +39,8 @@
val TEST_DURATION = 3000L
val channel = kind.create()
- val senderDone = RendezvousChannel<Boolean>()
- val receiverDone = RendezvousChannel<Boolean>()
+ val senderDone = ArrayChannel<Boolean>(1)
+ val receiverDone = ArrayChannel<Boolean>(1)
var lastSent = 0
var lastReceived = 0
@@ -46,36 +51,53 @@
var missedCnt = 0
var dupCnt = 0
+ val failed = AtomicReference<Throwable>()
+
lateinit var sender: Job
lateinit var receiver: Job
+ fun fail(e: Throwable) = failed.compareAndSet(null, e)
+
+ inline fun cancellable(done: ArrayChannel<Boolean>, block: () -> Unit) {
+ try {
+ block()
+ } catch (e: Throwable) {
+ if (e !is CancellationException) fail(e)
+ throw e
+ } finally {
+ if (!done.offer(true))
+ fail(IllegalStateException("failed to offer to done channel"))
+ }
+ }
+
@Test
- fun testAtomicCancelStress() = runBlocking {
+ fun testAtomicCancelStress() = runBlocking<Unit> {
val deadline = System.currentTimeMillis() + TEST_DURATION
launchSender()
launchReceiver()
val rnd = Random()
- while (System.currentTimeMillis() < deadline) {
+ while (System.currentTimeMillis() < deadline && failed.get() == null) {
when (rnd.nextInt(3)) {
0 -> { // cancel & restart sender
stopSender()
launchSender()
}
- 1 -> { // cancel & restrat receiver
- stopReceier()
+ 1 -> { // cancel & restart receiver
+ stopReceiver()
launchReceiver()
}
2 -> yield() // just yield (burn a little time)
}
}
stopSender()
- stopReceier()
+ stopReceiver()
println(" Sent $lastSent ints to channel")
println(" Received $lastReceived ints from channel")
println(" Stopped sender $stoppedSender times")
println("Stopped receiver $stoppedReceiver times")
println(" Missed $missedCnt ints")
println(" Duplicated $dupCnt ints")
+ failed.get()?.let { throw it }
assertEquals(0, missedCnt)
assertEquals(0, dupCnt)
assertEquals(lastSent, lastReceived)
@@ -83,14 +105,18 @@
fun launchSender() {
sender = launch(CommonPool) {
- try {
+ val rnd = Random()
+ cancellable(senderDone) {
while (true) {
val trySend = lastSent + 1
- channel.send(trySend)
+ when (rnd.nextInt(2)) {
+ 0 -> channel.send(trySend)
+ 1 -> select { channel.onSend(trySend) {} }
+ else -> error("cannot happen")
+ }
+
lastSent = trySend // update on success
}
- } finally {
- run(NonCancellable) { senderDone.send(true) }
}
}
}
@@ -103,9 +129,14 @@
fun launchReceiver() {
receiver = launch(CommonPool) {
- try {
+ val rnd = Random()
+ cancellable(receiverDone) {
while (true) {
- val received = channel.receive()
+ val received = when (rnd.nextInt(2)) {
+ 0 -> channel.receive()
+ 1 -> select { channel.onReceive { it } }
+ else -> error("cannot happen")
+ }
val expected = lastReceived + 1
if (received > expected)
missedCnt++
@@ -113,13 +144,11 @@
dupCnt++
lastReceived = received
}
- } finally {
- run(NonCancellable) { receiverDone.send(true) }
}
}
}
- suspend fun stopReceier() {
+ suspend fun stopReceiver() {
stoppedReceiver++
receiver.cancel()
receiverDone.receive()
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
index a2d71a2..0f5878a 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt
@@ -17,12 +17,14 @@
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.select
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
-import org.junit.Assert.*
@RunWith(Parameterized::class)
class ChannelSendReceiveStressTest(
@@ -35,12 +37,13 @@
@JvmStatic
fun params(): Collection<Array<Any>> =
listOf(1, 2, 10).flatMap { nSenders ->
- listOf(1, 6).flatMap { nReceivers ->
+ listOf(1, 10).flatMap { nReceivers ->
TestChannelKind.values().map { arrayOf<Any>(it, nSenders, nReceivers) }
}
}
}
+ val timeLimit = 30_000L // 30 sec
val nEvents = 1_000_000
val channel = kind.create()
@@ -55,24 +58,34 @@
val receivers = List(nReceivers) { receiverIndex ->
// different event receivers use different code
launch(CommonPool + CoroutineName("receiver$receiverIndex")) {
- when (receiverIndex % 3) {
+ when (receiverIndex % 5) {
0 -> doReceive(receiverIndex)
1 -> doReceiveOrNull(receiverIndex)
2 -> doIterator(receiverIndex)
+ 3 -> doReceiveSelect(receiverIndex)
+ 4 -> doReceiveSelectOrNull(receiverIndex)
}
receiversCompleted.incrementAndGet()
}
}
val senders = List(nSenders) { senderIndex ->
launch(CommonPool + CoroutineName("sender$senderIndex")) {
- for (i in senderIndex until nEvents step nSenders)
- channel.send(i)
+ when (senderIndex % 2) {
+ 0 -> doSend(senderIndex)
+ 1 -> doSendSelect(senderIndex)
+ }
sendersCompleted.incrementAndGet()
}
}
- senders.forEach { it.join() }
- channel.close()
- receivers.forEach { it.join() }
+ try {
+ withTimeout(timeLimit) {
+ senders.forEach { it.join() }
+ channel.close()
+ receivers.forEach { it.join() }
+ }
+ } catch (e: CancellationException) {
+ println("!!! Test timed out $e")
+ }
println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
@@ -90,9 +103,19 @@
}
}
+ private suspend fun doSend(senderIndex: Int) {
+ for (i in senderIndex until nEvents step nSenders)
+ channel.send(i)
+ }
+
+ private suspend fun doSendSelect(senderIndex: Int) {
+ for (i in senderIndex until nEvents step nSenders)
+ select<Unit> { channel.onSend(i) { Unit } }
+ }
+
private fun doReceived(receiverIndex: Int, event: Int) {
if (received.put(event, event) != null) {
- println("Duplicate event $event")
+ println("Duplicate event $event at $receiverIndex")
dupes.incrementAndGet()
}
receivedBy[receiverIndex]++
@@ -103,7 +126,6 @@
try { doReceived(receiverIndex, channel.receive()) }
catch (ex: ClosedReceiveChannelException) { break }
}
-
}
private suspend fun doReceiveOrNull(receiverIndex: Int) {
@@ -117,4 +139,20 @@
doReceived(receiverIndex, event)
}
}
+
+ private suspend fun doReceiveSelect(receiverIndex: Int) {
+ while (true) {
+ try {
+ val event = select<Int> { channel.onReceive { it } }
+ doReceived(receiverIndex, event)
+ } catch (ex: ClosedReceiveChannelException) { break }
+ }
+ }
+
+ private suspend fun doReceiveSelectOrNull(receiverIndex: Int) {
+ while (true) {
+ val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break
+ doReceived(receiverIndex, event)
+ }
+ }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt
deleted file mode 100644
index 456681d..0000000
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicRemoveStressTest.kt
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright 2016-2017 JetBrains s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kotlinx.coroutines.experimental.internal
-
-import org.junit.Assert.*
-import org.junit.Test
-import java.util.*
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.concurrent.thread
-
-/**
- * This stress test has 4 threads adding randomly first to the list and them immediately undoing
- * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
- */
-class LockFreeLinkedListAtomicRemoveStressTest {
- data class IntNode(val i: Int) : LockFreeLinkedListNode()
-
- val threads = mutableListOf<Thread>()
- val nLists = 4
- val nRemoverThreads = 4
- val timeout = 5000L
- val completedAdder = AtomicInteger()
- val completedRemover = AtomicInteger()
-
- val lists = Array(nLists) { LockFreeLinkedListHead() }
-
- val undone = AtomicInteger()
- val missed = AtomicInteger()
- val removed = AtomicInteger()
-
- @Test
- fun testStress() {
- val deadline = System.currentTimeMillis() + timeout
- repeat(nLists) { threadId ->
- threads += thread(start = false, name = "adder-$threadId") {
- val rnd = Random()
- val list = lists[threadId]
- while (System.currentTimeMillis() < deadline) {
- var node: IntNode? = IntNode(threadId)
- when (rnd.nextInt(3)) {
- 0 -> list.addLast(node!!)
- 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add
- 2 -> { // just to test failed conditional add and burn some time
- assertFalse(list.addLastIf(node!!, { false }))
- node = null
- }
- else -> error("Cannot happen")
- }
- if (node == null) continue
- when (rnd.nextInt(3)) {
- 0 -> {} // nothing -- be quick
- 1 -> {
- // burn some time
- Thread.yield()
- }
- 2 -> {
- // burn more time
- Thread.sleep(1)
- }
- else -> error("Cannot happen")
- }
- // undo add
- if (node.remove())
- undone.incrementAndGet()
- else
- missed.incrementAndGet()
- }
- completedAdder.incrementAndGet()
- }
- }
- repeat(nRemoverThreads) { threadId ->
- threads += thread(start = false, name = "remover-$threadId") {
- val rnd = Random()
- while (System.currentTimeMillis() < deadline) {
- val idx1 = rnd.nextInt(nLists - 1)
- val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
- check(idx1 < idx2) // that is our global order
- val list1 = lists[idx1]
- val list2 = lists[idx2]
- val remove1 = list1.describeRemoveFirst()
- val remove2 = list2.describeRemoveFirst()
- val op = object : AtomicOp() {
- override fun prepare(): Any? = remove1.prepare(this) ?: remove2.prepare(this)
- override fun complete(affected: Any?, failure: Any?) {
- remove1.complete(this, failure)
- remove2.complete(this, failure)
- }
- }
- val success = op.perform(null) == null
- if (success) removed.addAndGet(2)
-
- }
- completedRemover.incrementAndGet()
- }
- }
- threads.forEach { it.start() }
- threads.forEach { it.join() }
- println("Completed successfully ${completedAdder.get()} adder threads")
- println("Completed successfully ${completedRemover.get()} remover threads")
- println(" Adders undone ${undone.get()} node additions")
- println(" Adders missed ${missed.get()} nodes")
- println("Remover removed ${removed.get()} nodes")
- assertEquals(nLists, completedAdder.get())
- assertEquals(nRemoverThreads, completedRemover.get())
- assertEquals(missed.get(), removed.get())
- assertTrue(undone.get() > 0)
- assertTrue(missed.get() > 0)
- lists.forEach { it.validate() }
- }
-}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt
new file mode 100644
index 0000000..7c3d676
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListAtomicStressTest.kt
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+import org.junit.Assert.*
+import org.junit.Test
+import java.util.*
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.concurrent.thread
+
+/**
+ * This stress test has 4 threads adding randomly to the list and them immediately undoing
+ * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
+ */
+class LockFreeLinkedListAtomicStressTest {
+ data class IntNode(val i: Int) : LockFreeLinkedListNode()
+
+ val threads = mutableListOf<Thread>()
+ val nLists = 4
+ val nAdderThreads = 4
+ val nRemoverThreads = 4
+ val timeout = 5000L
+ val completedAdder = AtomicInteger()
+ val completedRemover = AtomicInteger()
+
+ val lists = Array(nLists) { LockFreeLinkedListHead() }
+
+ val undone = AtomicInteger()
+ val missed = AtomicInteger()
+ val removed = AtomicInteger()
+
+ @Test
+ fun testStress() {
+ val deadline = System.currentTimeMillis() + timeout
+ repeat(nAdderThreads) { threadId ->
+ threads += thread(start = false, name = "adder-$threadId") {
+ val rnd = Random()
+ while (System.currentTimeMillis() < deadline) {
+ when (rnd.nextInt(4)) {
+ 0 -> {
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ list.addLast(node)
+ burnTime(rnd)
+ tryRemove(node)
+ }
+ 1 -> {
+ // just to test conditional add
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ assertTrue(list.addLastIf(node, { true }))
+ burnTime(rnd)
+ tryRemove(node)
+ }
+ 2 -> {
+ // just to test failed conditional add and burn some time
+ val list = lists[rnd.nextInt(nLists)]
+ val node = IntNode(threadId)
+ assertFalse(list.addLastIf(node, { false }))
+ burnTime(rnd)
+ }
+ 3 -> {
+ // add two atomically
+ val idx1 = rnd.nextInt(nLists - 1)
+ val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
+ check(idx1 < idx2) // that is our global order
+ val list1 = lists[idx1]
+ val list2 = lists[idx2]
+ val node1 = IntNode(threadId)
+ val node2 = IntNode(-threadId - 1)
+ val add1 = list1.describeAddLast(node1)
+ val add2 = list2.describeAddLast(node2)
+ val op = object : AtomicOp() {
+ override fun prepare(): Any? = add1.prepare(this) ?: add2.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ add1.complete(this, failure)
+ add2.complete(this, failure)
+ }
+ }
+ assertTrue(op.perform(null) == null)
+ burnTime(rnd)
+ tryRemove(node1)
+ tryRemove(node2)
+ }
+ else -> error("Cannot happen")
+ }
+ }
+ completedAdder.incrementAndGet()
+ }
+ }
+ repeat(nRemoverThreads) { threadId ->
+ threads += thread(start = false, name = "remover-$threadId") {
+ val rnd = Random()
+ while (System.currentTimeMillis() < deadline) {
+ val idx1 = rnd.nextInt(nLists - 1)
+ val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
+ check(idx1 < idx2) // that is our global order
+ val list1 = lists[idx1]
+ val list2 = lists[idx2]
+ val remove1 = list1.describeRemoveFirst()
+ val remove2 = list2.describeRemoveFirst()
+ val op = object : AtomicOp() {
+ override fun prepare(): Any? = remove1.prepare(this) ?: remove2.prepare(this)
+ override fun complete(affected: Any?, failure: Any?) {
+ remove1.complete(this, failure)
+ remove2.complete(this, failure)
+ }
+ }
+ val success = op.perform(null) == null
+ if (success) removed.addAndGet(2)
+
+ }
+ completedRemover.incrementAndGet()
+ }
+ }
+ threads.forEach(Thread::start)
+ threads.forEach(Thread::join)
+ println("Completed successfully ${completedAdder.get()} adder threads")
+ println("Completed successfully ${completedRemover.get()} remover threads")
+ println(" Adders undone ${undone.get()} node additions")
+ println(" Adders missed ${missed.get()} nodes")
+ println("Remover removed ${removed.get()} nodes")
+ assertEquals(nAdderThreads, completedAdder.get())
+ assertEquals(nRemoverThreads, completedRemover.get())
+ assertEquals(missed.get(), removed.get())
+ assertTrue(undone.get() > 0)
+ assertTrue(missed.get() > 0)
+ lists.forEach { it.validate() }
+ }
+
+ private fun burnTime(rnd: Random) {
+ when (rnd.nextInt(3)) {
+ 0 -> {} // nothing -- be quick
+ 1 -> Thread.yield() // burn some time
+ 2 -> Thread.sleep(1) // burn more time
+ else -> error("Cannot happen")
+ }
+ }
+
+ private fun tryRemove(node: IntNode) {
+ if (node.remove())
+ undone.incrementAndGet()
+ else
+ missed.incrementAndGet()
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
index 2688256..6b47e42 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt
@@ -86,13 +86,13 @@
fun testAtomicOpsSingle() {
val list = LockFreeLinkedListHead()
assertContents(list)
- val n1 = IntNode(1).also { list.addLast(it) }
+ val n1 = IntNode(1).also { single(list.describeAddLast(it)) }
assertContents(list, 1)
- val n2 = IntNode(2).also { list.addLast(it) }
+ val n2 = IntNode(2).also { single(list.describeAddLast(it)) }
assertContents(list, 1, 2)
- val n3 = IntNode(3).also { list.addLast(it) }
+ val n3 = IntNode(3).also { single(list.describeAddLast(it)) }
assertContents(list, 1, 2, 3)
- val n4 = IntNode(4).also { list.addLast(it) }
+ val n4 = IntNode(4).also { single(list.describeAddLast(it)) }
assertContents(list, 1, 2, 3, 4)
single(n3.describeRemove()!!)
assertContents(list, 1, 2, 4)
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
new file mode 100644
index 0000000..c9cf44c
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt
@@ -0,0 +1,309 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.channels.ArrayChannel
+import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.yield
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SelectArrayChannelTest : TestBase() {
+ @Test
+ fun testSelectSendSuccess() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ launch(context) {
+ expect(2)
+ assertEquals("OK", channel.receive())
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(4)
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectSendSuccessWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ launch(context) {
+ expect(2)
+ assertEquals("OK", channel.receive())
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(4)
+ }
+ default {
+ expectUnreached()
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectSendReceiveBuf() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(2)
+ }
+ }
+ expect(3)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(4)
+ assertEquals("OK", v)
+ }
+ }
+ finish(5)
+ }
+
+ @Test
+ fun testSelectSendWait() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ launch(context) {
+ expect(4)
+ assertEquals("BUF", channel.receive())
+ expect(5)
+ assertEquals("OK", channel.receive())
+ expect(6)
+ }
+ expect(2)
+ channel.send("BUF")
+ expect(3)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(7)
+ }
+ }
+ finish(8)
+ }
+
+ @Test
+ fun testSelectReceiveSuccess() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ channel.send("OK")
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(3)
+ assertEquals("OK", v)
+ }
+ }
+ finish(4)
+ }
+
+ @Test
+ fun testSelectReceiveSuccessWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ channel.send("OK")
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(3)
+ assertEquals("OK", v)
+ }
+ default {
+ expectUnreached()
+ }
+ }
+ finish(4)
+ }
+
+ @Test
+ fun testSelectReceiveWaitWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ default {
+ expect(2)
+ }
+ }
+ expect(3)
+ channel.send("BUF")
+ expect(4)
+ // make sure second send blocks (select above is over)
+ launch(context) {
+ expect(6)
+ channel.send("CHK")
+ finish(10)
+ }
+ expect(5)
+ yield()
+ expect(7)
+ assertEquals("BUF", channel.receive())
+ expect(8)
+ assertEquals("CHK", channel.receive())
+ expect(9)
+ }
+
+ @Test
+ fun testSelectReceiveWait() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ launch(context) {
+ expect(3)
+ channel.send("OK")
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(5)
+ assertEquals("OK", v)
+ }
+ }
+ finish(6)
+ }
+
+ @Test(expected = ClosedReceiveChannelException::class)
+ fun testSelectReceiveClosed() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ channel.close()
+ finish(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ }
+ expectUnreached()
+ }
+
+ @Test(expected = ClosedReceiveChannelException::class)
+ fun testSelectReceiveWaitClosed() = runBlocking<Unit> {
+ expect(1)
+ val channel = ArrayChannel<String>(1)
+ launch(context) {
+ expect(3)
+ channel.close()
+ finish(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testSelectSendResourceCleanup() = runBlocking<Unit> {
+ val channel = ArrayChannel<Int>(1)
+ val n = 10_000_000
+ expect(1)
+ channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed
+ repeat(n) { i ->
+ select {
+ channel.onSend(i) { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectReceiveResourceCleanup() = runBlocking<Unit> {
+ val channel = ArrayChannel<Int>(1)
+ val n = 10_000_000
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onReceive { v -> expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectReceiveDispatchNonSuspending() = runBlocking<Unit> {
+ val channel = ArrayChannel<Int>(1)
+ expect(1)
+ channel.send(42)
+ expect(2)
+ launch(context) {
+ expect(4)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(5)
+ assertEquals(42, v)
+ expect(6)
+ }
+ }
+ expect(7) // returns from select without further dispatch
+ }
+ expect(3)
+ yield() // to launched
+ finish(8)
+ }
+
+ @Test
+ fun testSelectReceiveDispatchNonSuspending2() = runBlocking<Unit> {
+ val channel = ArrayChannel<Int>(1)
+ expect(1)
+ channel.send(42)
+ expect(2)
+ launch(context) {
+ expect(4)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(5)
+ assertEquals(42, v)
+ expect(6)
+ yield() // back to main
+ expect(8)
+ }
+ }
+ expect(9) // returns from select without further dispatch
+ }
+ expect(3)
+ yield() // to launched
+ expect(7)
+ yield() // again
+ finish(10)
+ }
+
+ // only for debugging
+ internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
+ this as SelectBuilderImpl // type assertion
+ if (!trySelect(null)) return
+ block.startUndispatchedCoroutine(this)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt
new file mode 100644
index 0000000..69d770c
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectDeferredTest.kt
@@ -0,0 +1,88 @@
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SelectDeferredTest : TestBase() {
+ @Test
+ fun testSimpleReturnsImmediately() = runBlocking<Unit> {
+ expect(1)
+ val d1 = async<Int>(context) {
+ expect(3)
+ 42
+ }
+ expect(2)
+ val res = select<String> {
+ d1.onAwait { v ->
+ expect(4)
+ assertEquals(42, v)
+ "OK"
+ }
+ }
+ expect(5)
+ assertEquals("OK", res)
+ finish(6)
+ }
+
+ @Test
+ fun testSimpleWithYield() = runBlocking<Unit> {
+ expect(1)
+ val d1 = async<Int>(context) {
+ expect(3)
+ 42
+ }
+ launch(context) {
+ expect(5)
+ yield() // back to main
+ expect(9)
+ }
+ expect(2)
+ val res = select<String> {
+ d1.onAwait { v ->
+ expect(4)
+ assertEquals(42, v)
+ yield() // to launch
+ expect(6)
+ "OK"
+ }
+ }
+ expect(7)
+ assertEquals("OK", res)
+ expect(8)
+ yield() // to launch again
+ finish(10)
+ }
+
+ @Test
+ fun testSelectTwo() = runBlocking<Unit> {
+ expect(1)
+ val d1 = async<String>(context) {
+ expect(3)
+ yield() // to the other deffered
+ expect(6)
+ "d1"
+ }
+ val d2 = async<String>(context) {
+ expect(4)
+ "d2" // returns result
+ }
+ expect(2)
+ val res = select<String> {
+ d1.onAwait { v1 ->
+ expectUnreached()
+ "FAIL"
+ }
+ d2.onAwait { v2 ->
+ expect(5)
+ assertEquals("d2", v2)
+ yield() // to first deferred
+ expect(7)
+ "OK"
+ }
+ }
+ assertEquals("OK", res)
+ finish(8)
+ }
+
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
new file mode 100644
index 0000000..6dc68c8
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt
@@ -0,0 +1,329 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.channels.ClosedReceiveChannelException
+import kotlinx.coroutines.experimental.channels.RendezvousChannel
+import kotlinx.coroutines.experimental.intrinsics.startUndispatchedCoroutine
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
+import kotlinx.coroutines.experimental.yield
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SelectRendezvousChannelTest : TestBase() {
+ @Test
+ fun testSelectSendSuccess() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(2)
+ assertEquals("OK", channel.receive())
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(4)
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectSendSuccessWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(2)
+ assertEquals("OK", channel.receive())
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(4)
+ }
+ default {
+ expectUnreached()
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectSendWaitWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ select<Unit> {
+ channel.onSend("OK") {
+ expectUnreached()
+ }
+ default {
+ expect(2)
+ }
+ }
+ expect(3)
+ // make sure receive blocks (select above is over)
+ launch(context) {
+ expect(5)
+ assertEquals("CHK", channel.receive())
+ finish(8)
+ }
+ expect(4)
+ yield()
+ expect(6)
+ channel.send("CHK")
+ expect(7)
+ }
+
+ @Test
+ fun testSelectSendWait() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(3)
+ assertEquals("OK", channel.receive())
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onSend("OK") {
+ expect(5)
+ }
+ }
+ finish(6)
+ }
+
+ @Test
+ fun testSelectReceiveSuccess() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(2)
+ channel.send("OK")
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(4)
+ assertEquals("OK", v)
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectReceiveSuccessWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(2)
+ channel.send("OK")
+ finish(6)
+ }
+ yield() // to launched coroutine
+ expect(3)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(4)
+ assertEquals("OK", v)
+ }
+ default {
+ expectUnreached()
+ }
+ }
+ expect(5)
+ }
+
+ @Test
+ fun testSelectReceiveWaitWithDefault() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ default {
+ expect(2)
+ }
+ }
+ expect(3)
+ // make sure send blocks (select above is over)
+ launch(context) {
+ expect(5)
+ channel.send("CHK")
+ finish(8)
+ }
+ expect(4)
+ yield()
+ expect(6)
+ assertEquals("CHK", channel.receive())
+ expect(7)
+ }
+
+ @Test
+ fun testSelectReceiveWait() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(3)
+ channel.send("OK")
+ expect(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expect(5)
+ assertEquals("OK", v)
+ }
+ }
+ finish(6)
+ }
+
+ @Test(expected = ClosedReceiveChannelException::class)
+ fun testSelectReceiveClosed() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ channel.close()
+ finish(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ }
+ expectUnreached()
+ }
+
+ @Test(expected = ClosedReceiveChannelException::class)
+ fun testSelectReceiveWaitClosed() = runBlocking<Unit> {
+ expect(1)
+ val channel = RendezvousChannel<String>()
+ launch(context) {
+ expect(3)
+ channel.close()
+ finish(4)
+ }
+ expect(2)
+ select<Unit> {
+ channel.onReceive { v ->
+ expectUnreached()
+ }
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testSelectSendResourceCleanup() = runBlocking<Unit> {
+ val channel = RendezvousChannel<Int>()
+ val n = 10_000_000
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onSend(i) { expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectReceiveResourceCleanup() = runBlocking<Unit> {
+ val channel = RendezvousChannel<Int>()
+ val n = 10_000_000
+ expect(1)
+ repeat(n) { i ->
+ select {
+ channel.onReceive { v -> expectUnreached() }
+ default { expect(i + 2) }
+ }
+ }
+ finish(n + 2)
+ }
+
+ @Test
+ fun testSelectAtomicFailure() = runBlocking<Unit> {
+ val c1 = RendezvousChannel<Int>()
+ val c2 = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ val res = select<String> {
+ c1.onReceive { v1 ->
+ expect(4)
+ assertEquals(42, v1)
+ yield() // back to main
+ expect(7)
+ "OK"
+ }
+ c2.onReceive {
+ "FAIL"
+ }
+ }
+ expect(8)
+ assertEquals("OK", res)
+ }
+ expect(2)
+ c1.send(42) // send to coroutine, suspends
+ expect(5)
+ c2.close() // makes sure that selected expression does not fail!
+ expect(6)
+ yield() // back
+ finish(9)
+ }
+
+ @Test
+ fun testSelectWaitDispatch() = runBlocking<Unit> {
+ val c = RendezvousChannel<Int>()
+ expect(1)
+ launch(context) {
+ expect(3)
+ val res = select<String> {
+ c.onReceive { v ->
+ expect(6)
+ assertEquals(42, v)
+ yield() // back to main
+ expect(8)
+ "OK"
+ }
+ }
+ expect(9)
+ assertEquals("OK", res)
+ }
+ expect(2)
+ yield() // to launch
+ expect(4)
+ c.send(42) // do not suspend
+ expect(5)
+ yield() // to receive
+ expect(7)
+ yield() // again
+ finish(10)
+ }
+
+ // only for debugging
+ internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
+ this as SelectBuilderImpl // type assertion
+ if (!trySelect(null)) return
+ block.startUndispatchedCoroutine(this)
+ }
+}
\ No newline at end of file