Structured concurrency implementation:
* Introducing async, launch, produce, actor and broadcast extensions on CoroutineScope
* Deprecate top-level coroutine builders
* Introducing currentScope and coroutineScope for manipulation with CoroutineScope interface
* Introducing CoroutineScope factories
* Introducing extension CoroutineScope.isActive
Fixes #410
diff --git a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
index 6d7874b..d88f23b 100644
--- a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
@@ -36,7 +36,9 @@
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
@Deprecated("Replaced with context", replaceWith = ReplaceWith("context"))
- public final override val coroutineContext: CoroutineContext get() = context
+ public override val coroutineContext: CoroutineContext get() = context
+
+ override val isActive: Boolean get() = super<JobSupport>.isActive
/**
* Initializes parent job from the `parentContext` of this coroutine that was passed to it during construction.
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index e5745b9..0ba7341 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -18,17 +18,13 @@
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
- * of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
- * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
+ * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
@@ -39,20 +35,18 @@
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
- * @param block the coroutine code.
- */
-public fun launch(
- context: CoroutineContext = DefaultDispatcher,
+ * @param block the coroutine code which will be invoked in the context of the provided scope
+ **/
+public fun CoroutineScope.launch(
+ context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> Unit
): Job {
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
@@ -60,6 +54,40 @@
coroutine.start(start, coroutine, block)
return coroutine
}
+
+/**
+ * Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
+ * @suppress **Deprecated** Use [CoroutineScope.launch] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.launch(context, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun launch(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> Unit
+): Job =
+ GlobalScope.launch(context, start, onCompletion, block)
+
+/**
+ * Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
+ * @suppress **Deprecated** Use [CoroutineScope.launch] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release",
+ replaceWith = ReplaceWith("GlobalScope.launch(context + parent, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun launch(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ parent: Job? = null, // nullable for binary compatibility
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> Unit
+): Job =
+ GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, onCompletion, block)
+
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
@@ -67,7 +95,8 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
-): Job = launch(context, start, parent, block = block)
+): Job =
+ GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, block = block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
@@ -84,7 +113,7 @@
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun launch(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> Unit): Job =
- launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
+ GlobalScope.launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
@@ -105,6 +134,19 @@
public suspend fun <T> withContext(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
+ block: suspend CoroutineScope.() -> T
+): T =
+ // todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
+ withContextImpl(context, start) {
+ currentScope {
+ block()
+ }
+ }
+
+// todo: optimize it to reduce allocations
+private suspend fun <T> withContextImpl(
+ context: CoroutineContext,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
@@ -137,6 +179,16 @@
completion.getResult()
}
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
+@JvmName("withContext")
+public suspend fun <T> withContext0(
+ context: CoroutineContext,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ block: suspend () -> T
+): T =
+ withContextImpl(context, start, block)
+
/** @suppress **Deprecated**: Renamed to [withContext]. */
@Deprecated(message = "Renamed to `withContext`", level=DeprecationLevel.WARNING,
replaceWith = ReplaceWith("withContext(context, start, block)"))
@@ -145,12 +197,12 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T =
- withContext(context, start, block)
+ withContextImpl(context, start, block)
/** @suppress **Deprecated** */
@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
- withContext(context, start = CoroutineStart.ATOMIC, block = block)
+ withContextImpl(context, start = CoroutineStart.ATOMIC, block = block)
// --------------- implementation ---------------
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
index 1f84a0f..1057236 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
@@ -4,12 +4,51 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.internal.*
import kotlin.coroutines.experimental.*
-import kotlin.internal.*
/**
- * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
- * and fast access to its own cancellation status via [isActive].
+ * Defines a scope for new coroutines. Every coroutine builder
+ * is an extension on [CoroutineScope] and inherits its [coroutineContext][CoroutineScope.coroutineContext]
+ * to automatically propagate both context elements and cancellation.
+ *
+ * [CoroutineScope] should be implemented on entities with well-defined lifecycle that are responsible
+ * for launching children coroutines. Example of such entity on Android is Activity.
+ * Usage of this interface may look like this:
+ *
+ * ```
+ * class MyActivity : AppCompatActivity(), CoroutineScope {
+ *
+ * override val coroutineContext: CoroutineContext
+ * get() = job + UI
+ *
+ * override fun onCreate(savedInstanceState: Bundle?) {
+ * super.onCreate(savedInstanceState)
+ * job = Job()
+ * }
+ *
+ * override fun onDestroy() {
+ * super.onDestroy()
+ * job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
+ * }
+ *
+ * /*
+ * * Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
+ * * in this method throws an exception, then all nested coroutines will be cancelled.
+ * */
+ * fun loadDataFromUI() = launch { // <- extension on current activity, launched in CommonPool
+ * val ioData = async(IO) { // <- extension on launch scope, launched in IO dispatcher
+ * // long computation
+ * }
+ *
+ * withContext(UI) {
+ * val data = ioData.await()
+ * draw(data)
+ * }
+ * }
+ * }
+ *
+ * ```
*/
public interface CoroutineScope {
/**
@@ -26,18 +65,140 @@
* [CoroutineScope] is available.
* See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
* [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
+ *
+ * @suppress **Deprecated**: Deprecated in favor of top-level extension property
*/
+ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Deprecated in favor of top-level extension property")
public val isActive: Boolean
+ get() = coroutineContext[Job]?.isActive ?: true
/**
- * Returns the context of this coroutine.
- *
- * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
+ * Returns the context of this scope.
*/
- @Deprecated("Replace with top-level coroutineContext",
- replaceWith = ReplaceWith("coroutineContext",
- imports = ["kotlin.coroutines.experimental.coroutineContext"]))
- @LowPriorityInOverloadResolution
- @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
public val coroutineContext: CoroutineContext
-}
\ No newline at end of file
+}
+
+/**
+ * Adds the specified coroutine context to this scope, overriding existing elements in the current
+ * scope's context with the corresponding keys.
+ *
+ * This is a shorthand for `CoroutineScope(thisScope + context)`.
+ */
+public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
+ CoroutineScope(context + context)
+
+/**
+ * Returns `true` when current [Job] is still active (has not completed and was not cancelled yet).
+ *
+ * Check this property in long-running computation loops to support cancellation:
+ * ```
+ * while (_isActive) {
+ * // do some computation
+ * }
+ * ```
+ *
+ * This property is a shortcut for `coroutineContext.isActive` in the scope when
+ * [CoroutineScope] is available.
+ * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
+ * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
+ */
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
+public val CoroutineScope.isActive: Boolean
+ get() = coroutineContext[Job]?.isActive ?: true
+
+/**
+ * A global [CoroutineScope] not bound to any job.
+ *
+ * Global scope is used to launch top-level coroutines which are operating on the whole application lifetime
+ * and are not cancelled prematurely.
+ * Another use of the global scope is [Unconfined] operators, which don't have any job associated with them.
+ *
+ * Application code usually should use application-defined [CoroutineScope], using [async] or [launch]
+ * on the instance of [GlobalScope] is highly discouraged.
+ *
+ * Usage of this interface may look like this:
+ *
+ * ```
+ * fun ReceiveChannel<Int>.sqrt(): ReceiveChannel<Double> = GlobalScope.produce(Unconfined) {
+ * for (number in this) {
+ * send(Math.sqrt(number))
+ * }
+ * }
+ *
+ * ```
+ */
+object GlobalScope : CoroutineScope {
+ /**
+ * @suppress **Deprecated**: Deprecated in favor of top-level extension property
+ */
+ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Deprecated in favor of top-level extension property")
+ override val isActive: Boolean
+ get() = true
+
+ /**
+ * Returns [EmptyCoroutineContext].
+ */
+ override val coroutineContext: CoroutineContext
+ get() = EmptyCoroutineContext
+}
+
+/**
+ * Creates new [CoroutineScope] and calls the specified suspend block with this scope.
+ * The provided scope inherits its [coroutineContext][CoroutineScope.coroutineContext] from the outer scope, but overrides
+ * context's [Job].
+ *
+ * This methods returns as soon as given block and all launched from within the scope children coroutines are completed.
+ * Example of the scope usages looks like this:
+ *
+ * ```
+ * suspend fun loadDataForUI() = coroutineScope {
+ *
+ * val data = async { // <- extension on current scope
+ * ... load some UI data ...
+ * }
+ *
+ * withContext(UI) {
+ * doSomeWork()
+ * val result = data.await()
+ * display(result)
+ * }
+ * }
+ * ```
+ *
+ * Semantics of the scope in this example:
+ * 1) `loadDataForUI` returns as soon as data is loaded and UI is updated.
+ * 2) If `doSomeWork` throws an exception, then `async` task is cancelled and `loadDataForUI` rethrows that exception.
+ * 3) If outer scope of `loadDataForUI` is cancelled, both started `async` and `withContext` are cancelled.
+ *
+ * Method may throw [JobCancellationException] if the current job was cancelled externally
+ * or may throw the corresponding unhandled [Throwable] if there is any unhandled exception in this scope
+ * (for example, from a crashed coroutine that was started with [launch] in this scope).
+ */
+public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
+ // todo: optimize implementation to a single allocated object
+ val owner = ScopeOwnerCoroutine<R>(coroutineContext)
+ owner.start(CoroutineStart.UNDISPATCHED, owner, block)
+ owner.join()
+ if (owner.isCancelled) {
+ throw owner.getCancellationException().let { it.cause ?: it }
+ }
+ val state = owner.state
+ if (state is CompletedExceptionally) {
+ throw state.cause
+ }
+ @Suppress("UNCHECKED_CAST")
+ return state as R
+}
+
+/**
+ * Provides [CoroutineScope] that is already present in the current [coroutineContext] to the given [block].
+ * Note, this method doesn't wait for all launched children to complete (as opposed to [coroutineContext]).
+ */
+public suspend inline fun <R> currentScope(block: CoroutineScope.() -> R): R =
+ CoroutineScope(coroutineContext).block()
+
+/**
+ * Creates [CoroutineScope] that wraps the given [coroutineContext].
+ */
+@Suppress("FunctionName")
+public fun CoroutineScope(context: CoroutineContext): CoroutineScope = ContextScope(context)
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/Deferred.kt b/common/kotlinx-coroutines-core-common/src/Deferred.kt
index 4fe58a9..200fda8 100644
--- a/common/kotlinx-coroutines-core-common/src/Deferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/Deferred.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
@@ -125,16 +126,13 @@
/**
* Creates new coroutine and returns its future result as an implementation of [Deferred].
*
- * The running coroutine is cancelled when the resulting object is [cancelled][Job.cancel].
+ * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
+ * Parent of the created coroutine is inherited from the provided [CoroutineScope].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
- * of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
@@ -142,20 +140,18 @@
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <T> async(
- context: CoroutineContext = DefaultDispatcher,
+public fun <T> CoroutineScope.async(
+ context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
@@ -164,6 +160,39 @@
return coroutine
}
+/**
+ * Creates new coroutine and returns its future result as an implementation of [Deferred].
+ * @suppress **Deprecated**. Use [CoroutineScope.async] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.async(context, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun <T> async(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> T
+): Deferred<T> =
+ GlobalScope.async(context, start, onCompletion, block)
+
+/**
+ * Creates new coroutine and returns its future result as an implementation of [Deferred].
+ * @suppress **Deprecated**. Use [CoroutineScope.async] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.async(context + parent, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun <T> async(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> T
+): Deferred<T> =
+ GlobalScope.async(context + (parent ?: EmptyCoroutineContext), start, onCompletion, block)
+
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <T> async(
@@ -171,7 +200,8 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
-): Deferred<T> = async(context, start, parent, block = block)
+): Deferred<T> =
+ GlobalScope.async(context + (parent ?: EmptyCoroutineContext), start, block = block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
@@ -180,7 +210,7 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> =
- async(context, start, block = block)
+ GlobalScope.async(context, start, block = block)
/**
* @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
@@ -188,7 +218,7 @@
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
- async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
+ GlobalScope.async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
/**
* @suppress **Deprecated**: `defer` was renamed to `async`.
@@ -196,7 +226,7 @@
@Deprecated(message = "`defer` was renamed to `async`", level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("async(context, block = block)"))
public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T): Deferred<T> =
- async(context, block = block)
+ GlobalScope.async(context, block = block)
@Suppress("UNCHECKED_CAST")
private open class DeferredCoroutine<T>(
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index ea024ed..4c8945d 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -132,7 +132,7 @@
}
}
- public final override val isActive: Boolean get() {
+ public override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
index f55b521..0f6b418 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
@@ -7,6 +7,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*
@@ -27,6 +28,29 @@
}
/**
+ * Launches new coroutine to produce a stream of values by sending them to a broadcast channel.
+ * Deprecated, use [CoroutineScope.broadcast] instead.
+ */
+@Deprecated(message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release")
+public fun <E> broadcast(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 1,
+ start: CoroutineStart = CoroutineStart.LAZY,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): BroadcastChannel<E> {
+ val channel = BroadcastChannel<E>(capacity)
+ val newContext = newCoroutineContext(context, parent)
+ val coroutine = if (start.isLazy)
+ LazyBroadcastCoroutine(newContext, channel, block) else
+ BroadcastCoroutine(newContext, channel, active = true)
+ if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+ coroutine.start(start, coroutine, block)
+ return coroutine
+}
+
+/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
@@ -36,13 +60,10 @@
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
@@ -62,20 +83,18 @@
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <E> broadcast(
- context: CoroutineContext = DefaultDispatcher,
+public fun <E> CoroutineScope.broadcast(
+ context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> {
+ val newContext = newCoroutineContext(context)
val channel = BroadcastChannel<E>(capacity)
- val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyBroadcastCoroutine(newContext, channel, block) else
BroadcastCoroutine(newContext, channel, active = true)
@@ -89,6 +108,9 @@
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
+
+ override val isActive: Boolean get() = super<AbstractCoroutine>.isActive
+
override val channel: SendChannel<E>
get() = this
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt b/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
index 3ad8f8e..be51b29 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
@@ -19,7 +19,7 @@
* Returns a channel to read all element of the [Iterable].
*/
public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context) {
+ GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
send(element)
}
@@ -28,7 +28,7 @@
* Returns a channel to read all element of the [Sequence].
*/
public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context) {
+ GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
send(element)
}
@@ -499,7 +499,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
require(n >= 0) { "Requested element count $n is less than zero." }
var remaining: Int = n
if (remaining > 0)
@@ -521,7 +521,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@dropWhile) {
if (!predicate(e)) {
send(e)
@@ -541,7 +541,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@filter) {
if (predicate(e)) send(e)
}
@@ -557,7 +557,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@filterIndexed) {
if (predicate(index++, e)) send(e)
@@ -705,7 +705,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
if (n == 0) return@produce
require(n >= 0) { "Requested element count $n is less than zero." }
var remaining: Int = n
@@ -725,7 +725,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@takeWhile) {
if (!predicate(e)) return@produce
send(e)
@@ -909,7 +909,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@flatMap) {
transform(e).toChannel(this)
}
@@ -986,7 +986,7 @@
*/
// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
consumeEach {
send(transform(it))
}
@@ -1003,7 +1003,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@mapIndexed) {
send(transform(index++, e))
@@ -1163,7 +1163,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@withIndex) {
send(IndexedValue(index++, e))
@@ -1192,7 +1192,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
val keys = HashSet<K>()
for (e in this@distinctBy) {
val k = selector(e)
@@ -1530,7 +1530,7 @@
*/
// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
- produce(context, onCompletion = consumesAll(this, other)) {
+ GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
val otherIterator = other.iterator()
this@zip.consumeEach { element1 ->
if (!otherIterator.hasNext()) return@consumeEach
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
index 4e9510c..8d4e550 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
@@ -6,6 +6,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.*
import kotlin.coroutines.experimental.*
/**
@@ -34,6 +35,27 @@
/**
* Launches new coroutine to produce a stream of values by sending them to a channel
+ * and returns a reference to the coroutine as a [ReceiveChannel].
+ * Deprecated, use [CoroutineScope.produce]
+ */
+@Deprecated(message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release")
+public fun <E> produce(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> {
+ val channel = Channel<E>(capacity)
+ val newContext = newCoroutineContext(context, parent)
+ val coroutine = ProducerCoroutine(newContext, channel)
+ if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ return coroutine
+}
+
+/**
+ * Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
*
@@ -43,13 +65,10 @@
* when the coroutine completes.
* The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
@@ -65,19 +84,17 @@
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (no buffer by default).
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <E> produce(
- context: CoroutineContext = DefaultDispatcher,
+public fun <E> CoroutineScope.produce(
+ context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -116,6 +133,10 @@
private class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
+
+ override val isActive: Boolean
+ get() = super<ChannelCoroutine>.isActive
+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
val processed = when (exceptionally) {
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
new file mode 100644
index 0000000..ee51615
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal class ScopeOwnerCoroutine<R>(
+ parentContext: CoroutineContext
+) : AbstractCoroutine<R>(parentContext, true), CoroutineScope {
+
+ override val coroutineContext: CoroutineContext = parentContext + this
+
+ /*
+ * Always return true, so final exception is in the scope before its completion.
+ */
+ override fun cancel(cause: Throwable?): Boolean {
+ super.cancel(cause)
+ return true
+ }
+}
+
+internal class ContextScope(context: CoroutineContext) : CoroutineScope {
+ override val coroutineContext: CoroutineContext = context
+}
+
+internal fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext =
+ newCoroutineContext(coroutineContext + context, parent = null)
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
index de7e004..0496904 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
@@ -6,7 +6,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class AsyncTest : TestBase() {
@@ -221,5 +220,22 @@
finish(2)
}
+ @Test
+ fun testOverriddenParent() = runTest {
+ val parent = Job()
+ val deferred = async(parent, CoroutineStart.ATOMIC) {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ parent.cancel()
+ try {
+ expect(1)
+ deferred.await()
+ } catch (e: JobCancellationException) {
+ finish(3)
+ }
+ }
+
private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
new file mode 100644
index 0000000..22652f2
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+class CoroutineScopeTest : TestBase() {
+
+ @Test
+ fun testScope() = runTest {
+ suspend fun callJobScoped() = coroutineScope {
+ expect(2)
+
+ launch {
+ expect(4)
+ }
+
+ launch {
+ expect(5)
+
+ launch {
+ expect(7)
+ }
+
+ expect(6)
+
+ }
+
+ expect(3)
+ 42
+ }
+
+
+ expect(1)
+ val result = callJobScoped()
+ assertEquals(42, result)
+ yield() // Check we're not cancelled
+ finish(8)
+ }
+
+ @Test
+ fun testScopeCancelledFromWithin() = runTest {
+ expect(1)
+ suspend fun callJobScoped() = coroutineScope {
+
+ launch {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ launch {
+ expect(3)
+ throw IllegalArgumentException()
+ }
+ }
+
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: IllegalArgumentException) {
+ expect(4)
+ }
+
+ yield() // Check we're not cancelled
+ finish(5)
+ }
+
+ @Test
+ fun testScopeBlockThrows() = runTest {
+ expect(1)
+ suspend fun callJobScoped(): Unit = coroutineScope {
+
+ launch {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ yield() // let launch sleep
+ throw NotImplementedError()
+ }
+
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: NotImplementedError) {
+ expect(3)
+ }
+
+ yield() // Check we're not cancelled
+ finish(4)
+ }
+
+ @Test
+ fun testOuterJobIsCancelled() = runTest {
+
+ suspend fun callJobScoped() = coroutineScope {
+
+ launch {
+ expect(3)
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(4)
+ }
+ }
+
+ expect(2)
+ delay(Long.MAX_VALUE)
+ 42
+ }
+
+
+ val outerJob = launch(coroutineContext.minusKey(Job)) {
+ expect(1)
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: JobCancellationException) {
+ expect(5)
+ assertNull(e.cause)
+ }
+ }
+
+ repeat(3) { yield() } // let everything to start properly
+ outerJob.cancel()
+ outerJob.join()
+ finish(6)
+ }
+
+ @Test
+ @Suppress("UNREACHABLE_CODE")
+ fun testDocumentationExample() = runTest {
+ suspend fun loadData() = coroutineScope {
+ expect(1)
+ val data = async {
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(3)
+ }
+ }
+
+ yield()
+
+ // UI updater
+ withContext(coroutineContext) {
+ expect(2)
+ throw AssertionError()
+ data.await() // Actually unreached
+ expectUnreached()
+ }
+ }
+
+
+ try {
+ loadData()
+ expectUnreached()
+ } catch (e: AssertionError) {
+ finish(4)
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
index 70d89ca..283a6d4 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
@@ -6,7 +6,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class CoroutinesTest : TestBase() {
diff --git a/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt
new file mode 100644
index 0000000..db17064
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+class CurrentScopeTest : TestBase() {
+
+ @Test
+ fun testScope() = runTest {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ finish(3)
+ }
+ }
+
+
+ expect(1)
+ callJobScoped()
+ expect(2)
+ }
+
+ @Test
+ fun testNestedScope() = runTest {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ expect(2)
+ }
+ }
+
+ expect(1)
+ coroutineScope {
+ callJobScoped()
+ }
+
+ finish(3)
+ }
+
+ @Test
+ fun testThrowException() = runTest(expected = { it is IndexOutOfBoundsException }) {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ finish(3)
+ throw IndexOutOfBoundsException()
+ }
+ }
+
+ expect(1)
+ callJobScoped()
+ expect(2)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/test/JobTest.kt b/common/kotlinx-coroutines-core-common/test/JobTest.kt
index 1beccb8..f83a893 100644
--- a/common/kotlinx-coroutines-core-common/test/JobTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/JobTest.kt
@@ -4,7 +4,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class JobTest : TestBase() {
@@ -175,4 +174,18 @@
job.cancelAndJoin()
finish(4)
}
+
+ @Test
+ fun testOverriddenParent() = runTest {
+ val parent = Job()
+ val deferred = launch(parent, CoroutineStart.ATOMIC) {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ parent.cancel()
+ expect(1)
+ deferred.join()
+ finish(3)
+ }
}
diff --git a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
index 9e40c1b..bed85d9 100644
--- a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
@@ -145,8 +145,7 @@
try {
withContext(job + wrapperDispatcher(coroutineContext), CoroutineStart.ATOMIC) {
- require(isActive)
- // but start atomically
+ require(!isActive) // but it had still started, because atomically
expect(2)
yield() // but will cancel here
expectUnreached()
@@ -165,7 +164,7 @@
val job = Job()
job.cancel() // try to cancel before it has a chance to run
withContext(job + wrapperDispatcher(coroutineContext), CoroutineStart.UNDISPATCHED) { // but start atomically
- require(isActive)
+ require(!isActive) // but it had still started, because undispatched
finish(2)
yield() // but will cancel here
expectUnreached()
diff --git a/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt b/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
index 0f0d059..74bdb44 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
@@ -12,7 +12,7 @@
@Test
fun testBroadcastBasic() = runTest {
expect(1)
- val b = broadcast(coroutineContext) {
+ val b = broadcast {
expect(4)
send(1) // goes to receiver
expect(5)
diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
index 70252a7..40b4e5b 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
@@ -12,7 +12,7 @@
@Test
fun testBasic() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -30,7 +30,7 @@
@Test
fun testCancelWithoutCause() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -54,7 +54,7 @@
@Test
fun testCancelWithCause() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -92,7 +92,7 @@
cancelOnCompletion(coroutineContext)
}
- private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) {
+ private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = currentScope {
val source = Channel<Int>()
expect(1)
val produced = produce<Int>(coroutineContext, onCompletion = source.consumes()) {