run function is cancellable by default and accepts optional CoroutineStart
diff --git a/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
index 56726ac..5293be1 100644
--- a/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
+++ b/integration/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
@@ -154,6 +154,7 @@
replaceWith = ReplaceWith("asCompletableFuture()"))
public fun <T> Deferred<T>.toCompletableFuture(): CompletableFuture<T> = asCompletableFuture()
+/** @suppress **Deprecated** */
@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
@Deprecated("Use the other version. This one is for binary compatibility only.", level=DeprecationLevel.HIDDEN)
public fun <T> future(
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 6b8ca8b..3528af6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -77,31 +77,52 @@
* This function immediately applies dispatcher from the new context, shifting execution of the block into the
* different thread inside the block, and back when it completes.
* The specified [context] is added onto the current coroutine context for the execution of the block.
+ *
+ * An optional `start` parameter is used only if the specified `context` uses a different [CoroutineDispatcher] than
+ * a current one, otherwise it is ignored.
+ * By default, the coroutine is immediately scheduled for execution and can be cancelled
+ * while it is waiting to be executed and it can be cancelled while the result is scheduled
+ * to be be processed by the invoker context.
+ * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
+ * A value of [CoroutineStart.LAZY] is not supported and produces [IllegalArgumentException].
*/
-public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
- suspendCoroutineOrReturn sc@ { cont ->
- val oldContext = cont.context
- // fast path #1 if there is no change in the actual context:
- if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
- return@sc block.startCoroutineUninterceptedOrReturn(cont)
- // compute new context
- val newContext = oldContext + context
- // fast path #2 if the result is actually the same
- if (newContext === oldContext)
- return@sc block.startCoroutineUninterceptedOrReturn(cont)
- // fast path #3 if the new dispatcher is the same as the old one.
- // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
- if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
- val newContinuation = RunContinuationDirect(newContext, cont)
- return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
- }
- // slowest path otherwise -- use new interceptor, sync to its result via a
- // full-blown instance of CancellableContinuation
- val newContinuation = RunContinuationCoroutine(newContext, cont)
- newContinuation.initCancellability()
- block.startCoroutine(newContinuation)
- newContinuation.getResult()
+public suspend fun <T> run(
+ context: CoroutineContext,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ block: suspend () -> T
+): T = suspendCoroutineOrReturn sc@ { cont ->
+ val oldContext = cont.context
+ // fast path #1 if there is no change in the actual context:
+ if (context === oldContext || context is CoroutineContext.Element && oldContext[context.key] === context)
+ return@sc block.startCoroutineUninterceptedOrReturn(cont)
+ // compute new context
+ val newContext = oldContext + context
+ // fast path #2 if the result is actually the same
+ if (newContext === oldContext)
+ return@sc block.startCoroutineUninterceptedOrReturn(cont)
+ // fast path #3 if the new dispatcher is the same as the old one.
+ // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
+ if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
+ val newContinuation = RunContinuationDirect(newContext, cont)
+ return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
}
+ // slowest path otherwise -- use new interceptor, sync to its result via a
+ // full-blown instance of CancellableContinuation
+ require(!start.isLazy) { "$start start is not supported" }
+ val newContinuation = RunContinuationCoroutine(
+ parentContext = newContext,
+ resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE,
+ continuation = cont)
+ newContinuation.initCancellability() // attach to parent job
+ start(block, newContinuation)
+ newContinuation.getResult()
+}
+
+/** @suppress **Deprecated** */
+@Suppress("DeprecatedCallableAddReplaceWith") // todo: the warning is incorrectly shown, see KT-17917
+@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
+public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
+ run(context, start = CoroutineStart.ATOMIC, block = block)
/**
* Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
@@ -157,8 +178,9 @@
private class RunContinuationCoroutine<in T>(
override val parentContext: CoroutineContext,
+ resumeMode: Int,
continuation: Continuation<T>
-) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = MODE_CANCELLABLE, active = true)
+) : CancellableContinuationImpl<T>(continuation, defaultResumeMode = resumeMode, active = true)
private class BlockingCoroutine<T>(
override val parentContext: CoroutineContext,
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
index 1563fa0..b824964 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineStart.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.CoroutineStart.*
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.startCoroutine
@@ -74,10 +75,32 @@
* Immediately executes coroutine until its first suspension point _in the current thread_ as if it the
* coroutine was started using [Unconfined] dispatcher. However, when coroutine is resumed from suspension
* it is dispatched according to the [CoroutineDispatcher] in its context.
+ *
+ * This is similar to [ATOMIC] in the sense that coroutine starts executing even if it was already cancelled,
+ * but the difference is that it start executing in the same thread.
+ *
+ * Cancellability of coroutine at suspension points depends on the particular implementation details of
+ * suspending functions as in [DEFAULT].
*/
UNDISPATCHED;
/**
+ * Starts the corresponding block as a coroutine with this coroutine start strategy.
+ *
+ * * [DEFAULT] uses [startCoroutineCancellable].
+ * * [ATOMIC] uses [startCoroutine].
+ * * [UNDISPATCHED] uses [startCoroutineUndispatched].
+ * * [LAZY] does nothing.
+ */
+ public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
+ when (this) {
+ CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
+ CoroutineStart.ATOMIC -> block.startCoroutine(completion)
+ CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
+ CoroutineStart.LAZY -> Unit // will start lazily
+ }
+
+ /**
* Starts the corresponding block with receiver as a coroutine with this coroutine start strategy.
*
* * [DEFAULT] uses [startCoroutineCancellable].
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
index 31a388d..5a6faff 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/RunTest.kt
@@ -19,6 +19,8 @@
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.core.IsEqual
import org.junit.Test
+import kotlin.coroutines.experimental.ContinuationInterceptor
+import kotlin.coroutines.experimental.CoroutineContext
class RunTest : TestBase() {
@Test
@@ -124,4 +126,46 @@
assertThat(result, IsEqual("OK"))
finish(4)
}
+
+ @Test(expected = CancellationException::class)
+ fun testRunCancellableDefault() = runBlocking<Unit> {
+ val job = Job()
+ job.cancel() // cancel before it has a chance to run
+ run(job + wrapperDispatcher(context)) {
+ expectUnreached() // will get cancelled
+ }
+ }
+
+ @Test(expected = CancellationException::class)
+ fun testRunAtomicTryCancel() = runBlocking<Unit> {
+ expect(1)
+ val job = Job()
+ job.cancel() // try to cancel before it has a chance to run
+ run(job + wrapperDispatcher(context), CoroutineStart.ATOMIC) { // but start atomically
+ finish(2)
+ yield() // but will cancel here
+ expectUnreached()
+ }
+ }
+
+ @Test(expected = CancellationException::class)
+ fun testRunUndispatchedTryCancel() = runBlocking<Unit> {
+ expect(1)
+ val job = Job()
+ job.cancel() // try to cancel before it has a chance to run
+ run(job + wrapperDispatcher(context), CoroutineStart.UNDISPATCHED) { // but start atomically
+ finish(2)
+ yield() // but will cancel here
+ expectUnreached()
+ }
+ }
+
+ private fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
+ val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
+ return object : CoroutineDispatcher() {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ dispatcher.dispatch(context, block)
+ }
+ }
+ }
}
\ No newline at end of file