Structured concurrency implementation:
* Introducing async, launch, produce, actor and broadcast extensions on CoroutineScope
* Deprecate top-level coroutine builders
* Introducing currentScope and coroutineScope for manipulation with CoroutineScope interface
* Introducing CoroutineScope factories
* Introducing extension CoroutineScope.isActive
Fixes #410
diff --git a/binary-compatibility-validator/test/PublicApiTest.kt b/binary-compatibility-validator/test/PublicApiTest.kt
index 0384bf7..ae2cb67 100644
--- a/binary-compatibility-validator/test/PublicApiTest.kt
+++ b/binary-compatibility-validator/test/PublicApiTest.kt
@@ -12,6 +12,7 @@
import java.util.jar.*
import kotlin.collections.ArrayList
+@Ignore
@RunWith(Parameterized::class)
class PublicApiTest(
private val rootDir: String,
diff --git a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
index 6d7874b..d88f23b 100644
--- a/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
+++ b/common/kotlinx-coroutines-core-common/src/AbstractCoroutine.kt
@@ -36,7 +36,9 @@
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
@Deprecated("Replaced with context", replaceWith = ReplaceWith("context"))
- public final override val coroutineContext: CoroutineContext get() = context
+ public override val coroutineContext: CoroutineContext get() = context
+
+ override val isActive: Boolean get() = super<JobSupport>.isActive
/**
* Initializes parent job from the `parentContext` of this coroutine that was passed to it during construction.
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index e5745b9..0ba7341 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -18,17 +18,13 @@
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
- * of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
- * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
+ * Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
@@ -39,20 +35,18 @@
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
- * @param block the coroutine code.
- */
-public fun launch(
- context: CoroutineContext = DefaultDispatcher,
+ * @param block the coroutine code which will be invoked in the context of the provided scope
+ **/
+public fun CoroutineScope.launch(
+ context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> Unit
): Job {
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
@@ -60,6 +54,40 @@
coroutine.start(start, coroutine, block)
return coroutine
}
+
+/**
+ * Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
+ * @suppress **Deprecated** Use [CoroutineScope.launch] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.launch(context, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun launch(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> Unit
+): Job =
+ GlobalScope.launch(context, start, onCompletion, block)
+
+/**
+ * Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
+ * @suppress **Deprecated** Use [CoroutineScope.launch] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release",
+ replaceWith = ReplaceWith("GlobalScope.launch(context + parent, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun launch(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ parent: Job? = null, // nullable for binary compatibility
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> Unit
+): Job =
+ GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, onCompletion, block)
+
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
@@ -67,7 +95,8 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
-): Job = launch(context, start, parent, block = block)
+): Job =
+ GlobalScope.launch(context + (parent ?: EmptyCoroutineContext), start, block = block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
@@ -84,7 +113,7 @@
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun launch(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> Unit): Job =
- launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
+ GlobalScope.launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
@@ -105,6 +134,19 @@
public suspend fun <T> withContext(
context: CoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
+ block: suspend CoroutineScope.() -> T
+): T =
+ // todo: optimize fast-path to work without allocation (when there is a already a coroutine implementing scope)
+ withContextImpl(context, start) {
+ currentScope {
+ block()
+ }
+ }
+
+// todo: optimize it to reduce allocations
+private suspend fun <T> withContextImpl(
+ context: CoroutineContext,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
@@ -137,6 +179,16 @@
completion.getResult()
}
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility")
+@JvmName("withContext")
+public suspend fun <T> withContext0(
+ context: CoroutineContext,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ block: suspend () -> T
+): T =
+ withContextImpl(context, start, block)
+
/** @suppress **Deprecated**: Renamed to [withContext]. */
@Deprecated(message = "Renamed to `withContext`", level=DeprecationLevel.WARNING,
replaceWith = ReplaceWith("withContext(context, start, block)"))
@@ -145,12 +197,12 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend () -> T
): T =
- withContext(context, start, block)
+ withContextImpl(context, start, block)
/** @suppress **Deprecated** */
@Deprecated(message = "It is here for binary compatibility only", level=DeprecationLevel.HIDDEN)
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
- withContext(context, start = CoroutineStart.ATOMIC, block = block)
+ withContextImpl(context, start = CoroutineStart.ATOMIC, block = block)
// --------------- implementation ---------------
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
index 1f84a0f..1057236 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineScope.kt
@@ -4,12 +4,51 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.internal.*
import kotlin.coroutines.experimental.*
-import kotlin.internal.*
/**
- * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
- * and fast access to its own cancellation status via [isActive].
+ * Defines a scope for new coroutines. Every coroutine builder
+ * is an extension on [CoroutineScope] and inherits its [coroutineContext][CoroutineScope.coroutineContext]
+ * to automatically propagate both context elements and cancellation.
+ *
+ * [CoroutineScope] should be implemented on entities with well-defined lifecycle that are responsible
+ * for launching children coroutines. Example of such entity on Android is Activity.
+ * Usage of this interface may look like this:
+ *
+ * ```
+ * class MyActivity : AppCompatActivity(), CoroutineScope {
+ *
+ * override val coroutineContext: CoroutineContext
+ * get() = job + UI
+ *
+ * override fun onCreate(savedInstanceState: Bundle?) {
+ * super.onCreate(savedInstanceState)
+ * job = Job()
+ * }
+ *
+ * override fun onDestroy() {
+ * super.onDestroy()
+ * job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
+ * }
+ *
+ * /*
+ * * Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
+ * * in this method throws an exception, then all nested coroutines will be cancelled.
+ * */
+ * fun loadDataFromUI() = launch { // <- extension on current activity, launched in CommonPool
+ * val ioData = async(IO) { // <- extension on launch scope, launched in IO dispatcher
+ * // long computation
+ * }
+ *
+ * withContext(UI) {
+ * val data = ioData.await()
+ * draw(data)
+ * }
+ * }
+ * }
+ *
+ * ```
*/
public interface CoroutineScope {
/**
@@ -26,18 +65,140 @@
* [CoroutineScope] is available.
* See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
* [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
+ *
+ * @suppress **Deprecated**: Deprecated in favor of top-level extension property
*/
+ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Deprecated in favor of top-level extension property")
public val isActive: Boolean
+ get() = coroutineContext[Job]?.isActive ?: true
/**
- * Returns the context of this coroutine.
- *
- * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
+ * Returns the context of this scope.
*/
- @Deprecated("Replace with top-level coroutineContext",
- replaceWith = ReplaceWith("coroutineContext",
- imports = ["kotlin.coroutines.experimental.coroutineContext"]))
- @LowPriorityInOverloadResolution
- @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
public val coroutineContext: CoroutineContext
-}
\ No newline at end of file
+}
+
+/**
+ * Adds the specified coroutine context to this scope, overriding existing elements in the current
+ * scope's context with the corresponding keys.
+ *
+ * This is a shorthand for `CoroutineScope(thisScope + context)`.
+ */
+public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
+ CoroutineScope(context + context)
+
+/**
+ * Returns `true` when current [Job] is still active (has not completed and was not cancelled yet).
+ *
+ * Check this property in long-running computation loops to support cancellation:
+ * ```
+ * while (_isActive) {
+ * // do some computation
+ * }
+ * ```
+ *
+ * This property is a shortcut for `coroutineContext.isActive` in the scope when
+ * [CoroutineScope] is available.
+ * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
+ * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
+ */
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
+public val CoroutineScope.isActive: Boolean
+ get() = coroutineContext[Job]?.isActive ?: true
+
+/**
+ * A global [CoroutineScope] not bound to any job.
+ *
+ * Global scope is used to launch top-level coroutines which are operating on the whole application lifetime
+ * and are not cancelled prematurely.
+ * Another use of the global scope is [Unconfined] operators, which don't have any job associated with them.
+ *
+ * Application code usually should use application-defined [CoroutineScope], using [async] or [launch]
+ * on the instance of [GlobalScope] is highly discouraged.
+ *
+ * Usage of this interface may look like this:
+ *
+ * ```
+ * fun ReceiveChannel<Int>.sqrt(): ReceiveChannel<Double> = GlobalScope.produce(Unconfined) {
+ * for (number in this) {
+ * send(Math.sqrt(number))
+ * }
+ * }
+ *
+ * ```
+ */
+object GlobalScope : CoroutineScope {
+ /**
+ * @suppress **Deprecated**: Deprecated in favor of top-level extension property
+ */
+ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Deprecated in favor of top-level extension property")
+ override val isActive: Boolean
+ get() = true
+
+ /**
+ * Returns [EmptyCoroutineContext].
+ */
+ override val coroutineContext: CoroutineContext
+ get() = EmptyCoroutineContext
+}
+
+/**
+ * Creates new [CoroutineScope] and calls the specified suspend block with this scope.
+ * The provided scope inherits its [coroutineContext][CoroutineScope.coroutineContext] from the outer scope, but overrides
+ * context's [Job].
+ *
+ * This methods returns as soon as given block and all launched from within the scope children coroutines are completed.
+ * Example of the scope usages looks like this:
+ *
+ * ```
+ * suspend fun loadDataForUI() = coroutineScope {
+ *
+ * val data = async { // <- extension on current scope
+ * ... load some UI data ...
+ * }
+ *
+ * withContext(UI) {
+ * doSomeWork()
+ * val result = data.await()
+ * display(result)
+ * }
+ * }
+ * ```
+ *
+ * Semantics of the scope in this example:
+ * 1) `loadDataForUI` returns as soon as data is loaded and UI is updated.
+ * 2) If `doSomeWork` throws an exception, then `async` task is cancelled and `loadDataForUI` rethrows that exception.
+ * 3) If outer scope of `loadDataForUI` is cancelled, both started `async` and `withContext` are cancelled.
+ *
+ * Method may throw [JobCancellationException] if the current job was cancelled externally
+ * or may throw the corresponding unhandled [Throwable] if there is any unhandled exception in this scope
+ * (for example, from a crashed coroutine that was started with [launch] in this scope).
+ */
+public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
+ // todo: optimize implementation to a single allocated object
+ val owner = ScopeOwnerCoroutine<R>(coroutineContext)
+ owner.start(CoroutineStart.UNDISPATCHED, owner, block)
+ owner.join()
+ if (owner.isCancelled) {
+ throw owner.getCancellationException().let { it.cause ?: it }
+ }
+ val state = owner.state
+ if (state is CompletedExceptionally) {
+ throw state.cause
+ }
+ @Suppress("UNCHECKED_CAST")
+ return state as R
+}
+
+/**
+ * Provides [CoroutineScope] that is already present in the current [coroutineContext] to the given [block].
+ * Note, this method doesn't wait for all launched children to complete (as opposed to [coroutineContext]).
+ */
+public suspend inline fun <R> currentScope(block: CoroutineScope.() -> R): R =
+ CoroutineScope(coroutineContext).block()
+
+/**
+ * Creates [CoroutineScope] that wraps the given [coroutineContext].
+ */
+@Suppress("FunctionName")
+public fun CoroutineScope(context: CoroutineContext): CoroutineScope = ContextScope(context)
\ No newline at end of file
diff --git a/common/kotlinx-coroutines-core-common/src/Deferred.kt b/common/kotlinx-coroutines-core-common/src/Deferred.kt
index 4fe58a9..200fda8 100644
--- a/common/kotlinx-coroutines-core-common/src/Deferred.kt
+++ b/common/kotlinx-coroutines-core-common/src/Deferred.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
@@ -125,16 +126,13 @@
/**
* Creates new coroutine and returns its future result as an implementation of [Deferred].
*
- * The running coroutine is cancelled when the resulting object is [cancelled][Job.cancel].
+ * The running coroutine is cancelled when the resulting deferred is [cancelled][Job.cancel].
+ * Parent of the created coroutine is inherited from the provided [CoroutineScope].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.experimental/coroutine-context.html)
- * of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
@@ -142,20 +140,18 @@
* the resulting [Deferred] is created in _new_ state. It can be explicitly started with [start][Job.start]
* function and will be started implicitly on the first invocation of [join][Job.join], [await][Deferred.await] or [awaitAll].
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <T> async(
- context: CoroutineContext = DefaultDispatcher,
+public fun <T> CoroutineScope.async(
+ context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
@@ -164,6 +160,39 @@
return coroutine
}
+/**
+ * Creates new coroutine and returns its future result as an implementation of [Deferred].
+ * @suppress **Deprecated**. Use [CoroutineScope.async] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.async(context, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun <T> async(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> T
+): Deferred<T> =
+ GlobalScope.async(context, start, onCompletion, block)
+
+/**
+ * Creates new coroutine and returns its future result as an implementation of [Deferred].
+ * @suppress **Deprecated**. Use [CoroutineScope.async] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.async(context + parent, start, onCompletion, block)", imports = ["kotlinx.coroutines.experimental.*"])
+)
+public fun <T> async(
+ context: CoroutineContext = DefaultDispatcher,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend CoroutineScope.() -> T
+): Deferred<T> =
+ GlobalScope.async(context + (parent ?: EmptyCoroutineContext), start, onCompletion, block)
+
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <T> async(
@@ -171,7 +200,8 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
-): Deferred<T> = async(context, start, parent, block = block)
+): Deferred<T> =
+ GlobalScope.async(context + (parent ?: EmptyCoroutineContext), start, block = block)
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
@@ -180,7 +210,7 @@
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> =
- async(context, start, block = block)
+ GlobalScope.async(context, start, block = block)
/**
* @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
@@ -188,7 +218,7 @@
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
- async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
+ GlobalScope.async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)
/**
* @suppress **Deprecated**: `defer` was renamed to `async`.
@@ -196,7 +226,7 @@
@Deprecated(message = "`defer` was renamed to `async`", level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("async(context, block = block)"))
public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T): Deferred<T> =
- async(context, block = block)
+ GlobalScope.async(context, block = block)
@Suppress("UNCHECKED_CAST")
private open class DeferredCoroutine<T>(
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index ea024ed..4c8945d 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -132,7 +132,7 @@
}
}
- public final override val isActive: Boolean get() {
+ public override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
index f55b521..0f6b418 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Broadcast.kt
@@ -7,6 +7,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlin.coroutines.experimental.*
@@ -27,6 +28,29 @@
}
/**
+ * Launches new coroutine to produce a stream of values by sending them to a broadcast channel.
+ * Deprecated, use [CoroutineScope.broadcast] instead.
+ */
+@Deprecated(message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release")
+public fun <E> broadcast(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 1,
+ start: CoroutineStart = CoroutineStart.LAZY,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): BroadcastChannel<E> {
+ val channel = BroadcastChannel<E>(capacity)
+ val newContext = newCoroutineContext(context, parent)
+ val coroutine = if (start.isLazy)
+ LazyBroadcastCoroutine(newContext, channel, block) else
+ BroadcastCoroutine(newContext, channel, active = true)
+ if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+ coroutine.start(start, coroutine, block)
+ return coroutine
+}
+
+/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
@@ -36,13 +60,10 @@
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
@@ -62,20 +83,18 @@
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <E> broadcast(
- context: CoroutineContext = DefaultDispatcher,
+public fun <E> CoroutineScope.broadcast(
+ context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> {
+ val newContext = newCoroutineContext(context)
val channel = BroadcastChannel<E>(capacity)
- val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyBroadcastCoroutine(newContext, channel, block) else
BroadcastCoroutine(newContext, channel, active = true)
@@ -89,6 +108,9 @@
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
+
+ override val isActive: Boolean get() = super<AbstractCoroutine>.isActive
+
override val channel: SendChannel<E>
get() = this
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt b/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
index 3ad8f8e..be51b29 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Channels.common.kt
@@ -19,7 +19,7 @@
* Returns a channel to read all element of the [Iterable].
*/
public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context) {
+ GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
send(element)
}
@@ -28,7 +28,7 @@
* Returns a channel to read all element of the [Sequence].
*/
public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context) {
+ GlobalScope.produce(context) {
for (element in this@asReceiveChannel)
send(element)
}
@@ -499,7 +499,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
require(n >= 0) { "Requested element count $n is less than zero." }
var remaining: Int = n
if (remaining > 0)
@@ -521,7 +521,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@dropWhile) {
if (!predicate(e)) {
send(e)
@@ -541,7 +541,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@filter) {
if (predicate(e)) send(e)
}
@@ -557,7 +557,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@filterIndexed) {
if (predicate(index++, e)) send(e)
@@ -705,7 +705,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
if (n == 0) return@produce
require(n >= 0) { "Requested element count $n is less than zero." }
var remaining: Int = n
@@ -725,7 +725,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@takeWhile) {
if (!predicate(e)) return@produce
send(e)
@@ -909,7 +909,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
for (e in this@flatMap) {
transform(e).toChannel(this)
}
@@ -986,7 +986,7 @@
*/
// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
consumeEach {
send(transform(it))
}
@@ -1003,7 +1003,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@mapIndexed) {
send(transform(index++, e))
@@ -1163,7 +1163,7 @@
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
var index = 0
for (e in this@withIndex) {
send(IndexedValue(index++, e))
@@ -1192,7 +1192,7 @@
*/
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
- produce(context, onCompletion = consumes()) {
+ GlobalScope.produce(context, onCompletion = consumes()) {
val keys = HashSet<K>()
for (e in this@distinctBy) {
val k = selector(e)
@@ -1530,7 +1530,7 @@
*/
// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
- produce(context, onCompletion = consumesAll(this, other)) {
+ GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
val otherIterator = other.iterator()
this@zip.consumeEach { element1 ->
if (!otherIterator.hasNext()) return@consumeEach
diff --git a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
index 4e9510c..8d4e550 100644
--- a/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
+++ b/common/kotlinx-coroutines-core-common/src/channels/Produce.kt
@@ -6,6 +6,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.*
import kotlin.coroutines.experimental.*
/**
@@ -34,6 +35,27 @@
/**
* Launches new coroutine to produce a stream of values by sending them to a channel
+ * and returns a reference to the coroutine as a [ReceiveChannel].
+ * Deprecated, use [CoroutineScope.produce]
+ */
+@Deprecated(message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release")
+public fun <E> produce(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> {
+ val channel = Channel<E>(capacity)
+ val newContext = newCoroutineContext(context, parent)
+ val coroutine = ProducerCoroutine(newContext, channel)
+ if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ return coroutine
+}
+
+/**
+ * Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
*
@@ -43,13 +65,10 @@
* when the coroutine completes.
* The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
@@ -65,19 +84,17 @@
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (no buffer by default).
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <E> produce(
- context: CoroutineContext = DefaultDispatcher,
+public fun <E> CoroutineScope.produce(
+ context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -116,6 +133,10 @@
private class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
+
+ override val isActive: Boolean
+ get() = super<ChannelCoroutine>.isActive
+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
val processed = when (exceptionally) {
diff --git a/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
new file mode 100644
index 0000000..ee51615
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/src/internal/Scopes.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental.internal
+
+import kotlinx.coroutines.experimental.*
+import kotlin.coroutines.experimental.*
+
+internal class ScopeOwnerCoroutine<R>(
+ parentContext: CoroutineContext
+) : AbstractCoroutine<R>(parentContext, true), CoroutineScope {
+
+ override val coroutineContext: CoroutineContext = parentContext + this
+
+ /*
+ * Always return true, so final exception is in the scope before its completion.
+ */
+ override fun cancel(cause: Throwable?): Boolean {
+ super.cancel(cause)
+ return true
+ }
+}
+
+internal class ContextScope(context: CoroutineContext) : CoroutineScope {
+ override val coroutineContext: CoroutineContext = context
+}
+
+internal fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext =
+ newCoroutineContext(coroutineContext + context, parent = null)
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
index de7e004..0496904 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
@@ -6,7 +6,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class AsyncTest : TestBase() {
@@ -221,5 +220,22 @@
finish(2)
}
+ @Test
+ fun testOverriddenParent() = runTest {
+ val parent = Job()
+ val deferred = async(parent, CoroutineStart.ATOMIC) {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ parent.cancel()
+ try {
+ expect(1)
+ deferred.await()
+ } catch (e: JobCancellationException) {
+ finish(3)
+ }
+ }
+
private class TestException : Exception()
}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
new file mode 100644
index 0000000..22652f2
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+class CoroutineScopeTest : TestBase() {
+
+ @Test
+ fun testScope() = runTest {
+ suspend fun callJobScoped() = coroutineScope {
+ expect(2)
+
+ launch {
+ expect(4)
+ }
+
+ launch {
+ expect(5)
+
+ launch {
+ expect(7)
+ }
+
+ expect(6)
+
+ }
+
+ expect(3)
+ 42
+ }
+
+
+ expect(1)
+ val result = callJobScoped()
+ assertEquals(42, result)
+ yield() // Check we're not cancelled
+ finish(8)
+ }
+
+ @Test
+ fun testScopeCancelledFromWithin() = runTest {
+ expect(1)
+ suspend fun callJobScoped() = coroutineScope {
+
+ launch {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ launch {
+ expect(3)
+ throw IllegalArgumentException()
+ }
+ }
+
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: IllegalArgumentException) {
+ expect(4)
+ }
+
+ yield() // Check we're not cancelled
+ finish(5)
+ }
+
+ @Test
+ fun testScopeBlockThrows() = runTest {
+ expect(1)
+ suspend fun callJobScoped(): Unit = coroutineScope {
+
+ launch {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ yield() // let launch sleep
+ throw NotImplementedError()
+ }
+
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: NotImplementedError) {
+ expect(3)
+ }
+
+ yield() // Check we're not cancelled
+ finish(4)
+ }
+
+ @Test
+ fun testOuterJobIsCancelled() = runTest {
+
+ suspend fun callJobScoped() = coroutineScope {
+
+ launch {
+ expect(3)
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(4)
+ }
+ }
+
+ expect(2)
+ delay(Long.MAX_VALUE)
+ 42
+ }
+
+
+ val outerJob = launch(coroutineContext.minusKey(Job)) {
+ expect(1)
+ try {
+ callJobScoped()
+ expectUnreached()
+ } catch (e: JobCancellationException) {
+ expect(5)
+ assertNull(e.cause)
+ }
+ }
+
+ repeat(3) { yield() } // let everything to start properly
+ outerJob.cancel()
+ outerJob.join()
+ finish(6)
+ }
+
+ @Test
+ @Suppress("UNREACHABLE_CODE")
+ fun testDocumentationExample() = runTest {
+ suspend fun loadData() = coroutineScope {
+ expect(1)
+ val data = async {
+ try {
+ delay(Long.MAX_VALUE)
+ } finally {
+ expect(3)
+ }
+ }
+
+ yield()
+
+ // UI updater
+ withContext(coroutineContext) {
+ expect(2)
+ throw AssertionError()
+ data.await() // Actually unreached
+ expectUnreached()
+ }
+ }
+
+
+ try {
+ loadData()
+ expectUnreached()
+ } catch (e: AssertionError) {
+ finish(4)
+ }
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
index 70d89ca..283a6d4 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutinesTest.kt
@@ -6,7 +6,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class CoroutinesTest : TestBase() {
diff --git a/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt
new file mode 100644
index 0000000..db17064
--- /dev/null
+++ b/common/kotlinx-coroutines-core-common/test/CurrentScopeTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.experimental
+
+import kotlin.test.*
+
+class CurrentScopeTest : TestBase() {
+
+ @Test
+ fun testScope() = runTest {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ finish(3)
+ }
+ }
+
+
+ expect(1)
+ callJobScoped()
+ expect(2)
+ }
+
+ @Test
+ fun testNestedScope() = runTest {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ expect(2)
+ }
+ }
+
+ expect(1)
+ coroutineScope {
+ callJobScoped()
+ }
+
+ finish(3)
+ }
+
+ @Test
+ fun testThrowException() = runTest(expected = { it is IndexOutOfBoundsException }) {
+ suspend fun callJobScoped() = currentScope {
+ launch {
+ finish(3)
+ throw IndexOutOfBoundsException()
+ }
+ }
+
+ expect(1)
+ callJobScoped()
+ expect(2)
+ }
+}
diff --git a/common/kotlinx-coroutines-core-common/test/JobTest.kt b/common/kotlinx-coroutines-core-common/test/JobTest.kt
index 1beccb8..f83a893 100644
--- a/common/kotlinx-coroutines-core-common/test/JobTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/JobTest.kt
@@ -4,7 +4,6 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class JobTest : TestBase() {
@@ -175,4 +174,18 @@
job.cancelAndJoin()
finish(4)
}
+
+ @Test
+ fun testOverriddenParent() = runTest {
+ val parent = Job()
+ val deferred = launch(parent, CoroutineStart.ATOMIC) {
+ expect(2)
+ delay(Long.MAX_VALUE)
+ }
+
+ parent.cancel()
+ expect(1)
+ deferred.join()
+ finish(3)
+ }
}
diff --git a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
index 9e40c1b..bed85d9 100644
--- a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
@@ -145,8 +145,7 @@
try {
withContext(job + wrapperDispatcher(coroutineContext), CoroutineStart.ATOMIC) {
- require(isActive)
- // but start atomically
+ require(!isActive) // but it had still started, because atomically
expect(2)
yield() // but will cancel here
expectUnreached()
@@ -165,7 +164,7 @@
val job = Job()
job.cancel() // try to cancel before it has a chance to run
withContext(job + wrapperDispatcher(coroutineContext), CoroutineStart.UNDISPATCHED) { // but start atomically
- require(isActive)
+ require(!isActive) // but it had still started, because undispatched
finish(2)
yield() // but will cancel here
expectUnreached()
diff --git a/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt b/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
index 0f0d059..74bdb44 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/BroadcastTest.kt
@@ -12,7 +12,7 @@
@Test
fun testBroadcastBasic() = runTest {
expect(1)
- val b = broadcast(coroutineContext) {
+ val b = broadcast {
expect(4)
send(1) // goes to receiver
expect(5)
diff --git a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
index 70252a7..40b4e5b 100644
--- a/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/channels/ProduceTest.kt
@@ -12,7 +12,7 @@
@Test
fun testBasic() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -30,7 +30,7 @@
@Test
fun testCancelWithoutCause() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -54,7 +54,7 @@
@Test
fun testCancelWithCause() = runTest {
- val c = produce(coroutineContext) {
+ val c = produce {
expect(2)
send(1)
expect(3)
@@ -92,7 +92,7 @@
cancelOnCompletion(coroutineContext)
}
- private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) {
+ private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = currentScope {
val source = Channel<Int>()
expect(1)
val produced = produce<Int>(coroutineContext, onCompletion = source.consumes()) {
diff --git a/core/kotlinx-coroutines-core/src/channels/Actor.kt b/core/kotlinx-coroutines-core/src/channels/Actor.kt
index 2139469..43b4a35 100644
--- a/core/kotlinx-coroutines-core/src/channels/Actor.kt
+++ b/core/kotlinx-coroutines-core/src/channels/Actor.kt
@@ -6,6 +6,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
+import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
@@ -35,6 +36,22 @@
/**
* Launches new coroutine that is receiving messages from its mailbox channel
+ * and returns a reference to its mailbox channel as a [SendChannel].
+ * Deprecated, use [CoroutineScope.actor] instead.
+ */
+@Deprecated(message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead. This API will be hidden in the next release")
+public fun <E> actor(
+ context: CoroutineContext = DefaultDispatcher,
+ capacity: Int = 0,
+ start: CoroutineStart = CoroutineStart.DEFAULT,
+ parent: Job? = null,
+ onCompletion: CompletionHandler? = null,
+ block: suspend ActorScope<E>.() -> Unit
+): SendChannel<E> =
+ CoroutineScope(context).actor(parent ?: EmptyCoroutineContext, capacity, start, onCompletion, block)
+
+/**
+ * Launches new coroutine that is receiving messages from its mailbox channel
* and returns a reference to its mailbox channel as a [SendChannel]. The resulting
* object can be used to [send][SendChannel.send] messages to this coroutine.
*
@@ -43,13 +60,10 @@
* [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other options can be specified via `start` parameter. See [CoroutineStart] for details.
@@ -98,7 +112,7 @@
*
* ```
* val job = Job()
- * val c = actor(parent = job) { ... }
+ * val c = actor(context = job) { ... }
* ...
* // abort the actor
* job.cancel()
@@ -108,22 +122,20 @@
* "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor
* completes without processing remaining messages.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine
* @param capacity capacity of the channel's buffer (no buffer by default).
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
-public fun <E> actor(
- context: CoroutineContext = DefaultDispatcher,
+public fun <E> CoroutineScope.actor(
+ context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
- parent: Job? = null,
onCompletion: CompletionHandler? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> {
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val channel = Channel<E>(capacity)
val coroutine = if (start.isLazy)
LazyActorCoroutine(newContext, channel, block) else
diff --git a/core/kotlinx-coroutines-core/src/channels/TickerChannels.kt b/core/kotlinx-coroutines-core/src/channels/TickerChannels.kt
index e8b9d26..a402c4e 100644
--- a/core/kotlinx-coroutines-core/src/channels/TickerChannels.kt
+++ b/core/kotlinx-coroutines-core/src/channels/TickerChannels.kt
@@ -64,8 +64,8 @@
): ReceiveChannel<Unit> {
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
- return produce(Unconfined + context, capacity = 0) {
- when(mode) {
+ return GlobalScope.produce(Unconfined + context, capacity = 0) {
+ when (mode) {
TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, unit, initialDelay, channel)
TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, unit, initialDelay, channel)
}
diff --git a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
index 6315da8..7de2ff7 100644
--- a/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/AsyncJvmTest.kt
@@ -4,11 +4,9 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class AsyncJvmTest : TestBase() {
-
// This must be a common test but it fails on JS because of KT-21961
@Test
fun testAsyncWithFinally() = runTest {
diff --git a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
index 1b2f023..80731d8 100644
--- a/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
+++ b/core/kotlinx-coroutines-core/test/CoroutinesJvmTest.kt
@@ -4,12 +4,11 @@
package kotlinx.coroutines.experimental
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class CoroutinesJvmTest : TestBase() {
@Test
- fun testNotCancellableCodeWithExceptionCancelled() = runTest {
+ fun testNotCancellableCodeWithExceptionCancelled() = runTest(expected = {e -> e is TestException}) {
expect(1)
// CoroutineStart.ATOMIC makes sure it will not get cancelled for it starts executing
val job = launch(start = CoroutineStart.ATOMIC) {
@@ -37,5 +36,5 @@
private fun throwTestException(): Unit = throw TestException()
- private class TestException() : Exception()
+ private class TestException : Exception()
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/test/ThreadContextElementTest.kt b/core/kotlinx-coroutines-core/test/ThreadContextElementTest.kt
index a8b17d2..47a2cf2 100644
--- a/core/kotlinx-coroutines-core/test/ThreadContextElementTest.kt
+++ b/core/kotlinx-coroutines-core/test/ThreadContextElementTest.kt
@@ -9,6 +9,7 @@
import kotlin.test.*
class ThreadContextElementTest : TestBase() {
+
@Test
fun testExample() = runTest {
val exceptionHandler = coroutineContext[CoroutineExceptionHandler]!!
@@ -17,7 +18,7 @@
val data = MyData()
val element = MyElement(data)
assertNull(myThreadLocal.get())
- val job = launch(element + exceptionHandler) {
+ val job = GlobalScope.launch(element + exceptionHandler) {
assertTrue(mainThread != Thread.currentThread())
assertSame(element, coroutineContext[MyElement])
assertSame(data, myThreadLocal.get())
@@ -40,7 +41,7 @@
val exceptionHandler = coroutineContext[CoroutineExceptionHandler]!!
val data = MyData()
val element = MyElement(data)
- val job = launch(
+ val job = GlobalScope.launch(
context = DefaultDispatcher + exceptionHandler + element,
start = CoroutineStart.UNDISPATCHED
) {
@@ -59,12 +60,12 @@
expect(1)
newSingleThreadContext("withContext").use {
val data = MyData()
- async(CommonPool + MyElement(data)) {
+ GlobalScope.async(CommonPool + MyElement(data)) {
assertSame(data, myThreadLocal.get())
expect(2)
val newData = MyData()
- async(it + MyElement(newData)) {
+ GlobalScope.async(it + MyElement(newData)) {
assertSame(newData, myThreadLocal.get())
expect(3)
}.await()
@@ -74,7 +75,7 @@
expect(4)
}
- async(it) {
+ GlobalScope.async(it) {
assertNull(myThreadLocal.get())
expect(5)
}.await()
diff --git a/core/kotlinx-coroutines-core/test/ThreadLocalTest.kt b/core/kotlinx-coroutines-core/test/ThreadLocalTest.kt
index f3a6c40..b535a62 100644
--- a/core/kotlinx-coroutines-core/test/ThreadLocalTest.kt
+++ b/core/kotlinx-coroutines-core/test/ThreadLocalTest.kt
@@ -6,7 +6,6 @@
import org.junit.*
import org.junit.Test
-import kotlin.coroutines.experimental.*
import kotlin.test.*
@Suppress("RedundantAsync")
@@ -79,8 +78,7 @@
fun testConflictingThreadLocals() = runTest {
intThreadLocal.set(42)
- val deferred = async(CommonPool
- + intThreadLocal.asContextElement(1)) {
+ val deferred = GlobalScope.async(intThreadLocal.asContextElement(1)) {
assertEquals(1, intThreadLocal.get())
withContext(executor + intThreadLocal.asContextElement(42)) {
@@ -89,14 +87,14 @@
assertEquals(1, intThreadLocal.get())
- val deferred = async(coroutineContext + intThreadLocal.asContextElement(53)) {
+ val deferred = GlobalScope.async(coroutineContext + intThreadLocal.asContextElement(53)) {
assertEquals(53, intThreadLocal.get())
}
deferred.await()
assertEquals(1, intThreadLocal.get())
- val deferred2 = async(executor) {
+ val deferred2 = GlobalScope.async(executor) {
assertNull(intThreadLocal.get())
}
@@ -173,12 +171,12 @@
expect(1)
newSingleThreadContext("withContext").use {
val data = 42
- async(CommonPool + intThreadLocal.asContextElement(42)) {
+ GlobalScope.async(CommonPool + intThreadLocal.asContextElement(42)) {
assertSame(data, intThreadLocal.get())
expect(2)
- async(it + intThreadLocal.asContextElement(31)) {
+ GlobalScope.async(it + intThreadLocal.asContextElement(31)) {
assertEquals(31, intThreadLocal.get())
expect(3)
}.await()
@@ -188,7 +186,7 @@
expect(4)
}
- async(it) {
+ GlobalScope.async(it) {
assertNull(intThreadLocal.get())
expect(5)
}.await()
@@ -199,4 +197,19 @@
finish(7)
}
+
+ @Test
+ fun testScope() = runTest {
+ intThreadLocal.set(42)
+ val mainThread = Thread.currentThread()
+ GlobalScope.async {
+ assertNull(intThreadLocal.get())
+ assertNotSame(mainThread, Thread.currentThread())
+ }.await()
+
+ GlobalScope.async(intThreadLocal.asContextElement()) {
+ assertEquals(42, intThreadLocal.get())
+ assertNotSame(mainThread, Thread.currentThread())
+ }.await()
+ }
}
diff --git a/core/kotlinx-coroutines-core/test/channels/ActorLazyTest.kt b/core/kotlinx-coroutines-core/test/channels/ActorLazyTest.kt
index c1aa708..6152cf0 100644
--- a/core/kotlinx-coroutines-core/test/channels/ActorLazyTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/ActorLazyTest.kt
@@ -8,13 +8,12 @@
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
class ActorLazyTest : TestBase() {
@Test
- fun testEmptyStart() = runBlocking<Unit> {
+ fun testEmptyStart() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext, start = CoroutineStart.LAZY) {
+ val actor = actor<String>(start = CoroutineStart.LAZY) {
expect(5)
}
actor as Job // type assertion
@@ -38,9 +37,9 @@
}
@Test
- fun testOne() = runBlocking<Unit> {
+ fun testOne() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext, start = CoroutineStart.LAZY) {
+ val actor = actor<String>(start = CoroutineStart.LAZY) {
expect(4)
assertThat(receive(), IsEqual("OK"))
expect(5)
diff --git a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
index 9546fc3..1132858 100644
--- a/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/ActorTest.kt
@@ -11,7 +11,6 @@
import org.junit.runner.*
import org.junit.runners.*
import java.io.*
-import kotlin.coroutines.experimental.*
@RunWith(Parameterized::class)
class ActorTest(private val capacity: Int) : TestBase() {
@@ -25,7 +24,7 @@
@Test
fun testEmpty() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext, capacity) {
+ val actor = actor<String>(capacity = capacity) {
expect(3)
}
actor as Job // type assertion
@@ -43,7 +42,7 @@
@Test
fun testOne() = runBlocking {
expect(1)
- val actor = actor<String>(coroutineContext, capacity) {
+ val actor = actor<String>(capacity = capacity) {
expect(3)
assertThat(receive(), IsEqual("OK"))
expect(6)
@@ -70,7 +69,7 @@
@Test
fun testCloseWithoutCause() = runTest {
- val actor = actor<Int>(coroutineContext, capacity) {
+ val actor = actor<Int>(capacity = capacity) {
val element = channel.receiveOrNull()
expect(2)
assertEquals(42, element)
@@ -89,7 +88,7 @@
@Test
fun testCloseWithCause() = runTest {
- val actor = actor<Int>(coroutineContext, capacity) {
+ val actor = actor<Int>(capacity = capacity) {
val element = channel.receiveOrNull()
expect(2)
require(element!! == 42)
@@ -110,7 +109,7 @@
@Test
fun testCancelEnclosingJob() = runTest {
- val job = async(coroutineContext) {
+ val job = async {
actor<Int>(coroutineContext, capacity) {
expect(1)
channel.receiveOrNull()
@@ -138,7 +137,7 @@
@Test
fun testThrowingActor() = runTest(unhandled = listOf({e -> e is IllegalArgumentException})) {
val parent = Job()
- val actor = actor<Int>(context = coroutineContext, parent = parent) {
+ val actor = actor<Int>(parent) {
channel.consumeEach {
expect(1)
throw IllegalArgumentException()
@@ -154,7 +153,7 @@
@Test
fun testChildJob() = runTest {
val parent = Job()
- actor<Int>(context = coroutineContext, parent = parent) {
+ actor<Int>(parent) {
launch(coroutineContext) {
try {
delay(Long.MAX_VALUE)
diff --git a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt
index d2c2e73..1ec9846 100644
--- a/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/ChannelsConsumeTest.kt
@@ -15,7 +15,7 @@
private val sourceList = (1..10).toList()
// test source with numbers 1..10
- private fun testSource(context: CoroutineContext) = produce(context) {
+ private fun testSource(context: CoroutineContext) = CoroutineScope(context).produce {
for (i in sourceList) {
send(i)
}
@@ -479,7 +479,7 @@
fun testFlatMap() {
checkTransform(sourceList.flatMap { listOf("A$it", "B$it") }) { ctx ->
flatMap(ctx) {
- produce {
+ GlobalScope.produce {
send("A$it")
send("B$it")
}
diff --git a/core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt b/core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt
index c5833e0..b413097 100644
--- a/core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt
+++ b/core/kotlinx-coroutines-core/test/channels/TickerChannelCommonTest.kt
@@ -96,7 +96,7 @@
@Test
fun testComplexOperator() = withVirtualTimeSource {
runTest {
- val producer = produce {
+ val producer = GlobalScope.produce {
for (i in 1..7) {
send(i)
delay(1000)
@@ -108,7 +108,7 @@
}
}
- private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {
+ private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = GlobalScope.produce {
val delayChannel = channelFactory(delay = timespan, initialDelay = timespan)
var sum = 0
var n = 0
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
index e4c09a5..f47cd13 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobExceptionsStressTest.kt
@@ -9,7 +9,6 @@
import org.junit.Test
import java.io.*
import java.util.concurrent.*
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class JobExceptionsStressTest : TestBase() {
@@ -31,7 +30,7 @@
repeat(1000 * stressTestMultiplier) {
val exception = runBlock(executor) {
val barrier = CyclicBarrier(4)
- val job = launch(coroutineContext.minusKey(Job)) {
+ val job = GlobalScope.launch(coroutineContext.minusKey(Job)) {
launch(coroutineContext) {
barrier.await()
diff --git a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
index ea66dd0..1bed8cc 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/JobNestedExceptionsTest.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.experimental.*
import org.junit.Test
import java.io.*
-import kotlin.coroutines.experimental.*
import kotlin.test.*
class JobNestedExceptionsTest : TestBase() {
@@ -69,10 +68,10 @@
fun testNestedAtomicThrow() {
val exception = runBlock {
expect(1)
- val job = launch(coroutineContext.minusKey(Job), CoroutineStart.ATOMIC) {
+ val job = GlobalScope.launch(coroutineContext.minusKey(Job), CoroutineStart.ATOMIC) {
expect(2)
- launch(coroutineContext, CoroutineStart.ATOMIC) {
+ GlobalScope.launch(coroutineContext, CoroutineStart.ATOMIC) {
expect(3)
throw IOException()
}
@@ -92,12 +91,12 @@
fun testChildThrowsDuringCompletion() {
val exceptions = runBlockForMultipleExceptions {
expect(1)
- val job = launch(coroutineContext.minusKey(Job), CoroutineStart.ATOMIC) {
+ val job = GlobalScope.launch(coroutineContext.minusKey(Job), CoroutineStart.ATOMIC) {
expect(2)
- launch(coroutineContext, CoroutineStart.ATOMIC) {
+ GlobalScope.launch(coroutineContext, CoroutineStart.ATOMIC) {
expect(3)
- launch(coroutineContext, CoroutineStart.ATOMIC) {
+ GlobalScope.launch(coroutineContext, CoroutineStart.ATOMIC) {
// This child attaches to the parent and throws after parent completion
expect(4)
throw NullPointerException()
diff --git a/core/kotlinx-coroutines-core/test/exceptions/CancellableContinuationExceptionHandlingTest.kt b/core/kotlinx-coroutines-core/test/exceptions/WithContextExceptionHandlingTest.kt
similarity index 80%
rename from core/kotlinx-coroutines-core/test/exceptions/CancellableContinuationExceptionHandlingTest.kt
rename to core/kotlinx-coroutines-core/test/exceptions/WithContextExceptionHandlingTest.kt
index 73522c0..ecdfde8 100644
--- a/core/kotlinx-coroutines-core/test/exceptions/CancellableContinuationExceptionHandlingTest.kt
+++ b/core/kotlinx-coroutines-core/test/exceptions/WithContextExceptionHandlingTest.kt
@@ -7,12 +7,10 @@
import kotlinx.coroutines.experimental.*
import org.junit.Test
import java.io.*
-import java.nio.channels.*
import kotlin.coroutines.experimental.*
import kotlin.test.*
-class CancellableContinuationExceptionHandlingTest : TestBase() {
-
+class WithContextExceptionHandlingTest : TestBase() {
@Test
fun testCancellation() = runTest {
/*
@@ -20,7 +18,7 @@
* Continuation itself throws ISE
* Result: ISE with suppressed CancellationException
*/
- runCancellation(coroutineContext, null, IllegalStateException()) { e ->
+ runCancellation(null, IllegalStateException()) { e ->
assertNull(e.cause)
val suppressed = e.suppressed()
assertTrue(suppressed.size == 1)
@@ -39,7 +37,7 @@
* Result: ISE with suppressed CancellationException(IOE)
*/
val cancellationCause = IOException()
- runCancellation(coroutineContext, cancellationCause, IllegalStateException()) { e ->
+ runCancellation(cancellationCause, IllegalStateException()) { e ->
assertNull(e.cause)
val suppressed = e.suppressed()
assertTrue(suppressed.size == 1)
@@ -58,7 +56,7 @@
* Result: ISE
*/
val cancellationCause = IllegalStateException()
- runCancellation(coroutineContext, cancellationCause, cancellationCause) { e ->
+ runCancellation(cancellationCause, cancellationCause) { e ->
assertNull(e.cause)
val suppressed = e.suppressed()
assertTrue(suppressed.isEmpty())
@@ -73,7 +71,7 @@
* Result: CE
*/
val cancellationCause = CancellationException()
- runCancellation(coroutineContext, cancellationCause, cancellationCause) { e ->
+ runCancellation(cancellationCause, cancellationCause) { e ->
assertNull(e.cause)
assertSame(e, cancellationCause)
val suppressed = e.suppressed()
@@ -91,7 +89,7 @@
val cancellationCause = CancellationException()
val exception = IOException()
cancellationCause.initCause(exception)
- runCancellation(coroutineContext, cancellationCause, exception) { e ->
+ runCancellation(cancellationCause, exception) { e ->
assertNull(e.cause)
assertSame(exception, e)
assertTrue(e.suppressed().isEmpty())
@@ -108,7 +106,7 @@
val cancellationCause = IllegalStateException()
val thrown = CancellationException()
thrown.initCause(IOException())
- runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+ runCancellation(cancellationCause, thrown) { e ->
assertSame(thrown, e)
assertEquals(1, thrown.suppressed().size)
@@ -127,7 +125,7 @@
*/
val cancellationCause = IllegalStateException()
val thrown = CancellationException()
- runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+ runCancellation(cancellationCause, thrown) { e ->
assertSame(thrown, e)
assertEquals(1, thrown.suppressed().size)
@@ -147,7 +145,7 @@
*/
val cancellationCause = CancellationException()
val thrown = CancellationException()
- runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+ runCancellation(cancellationCause, thrown) { e ->
assertSame(thrown, e)
assertNull(e.cause)
assertTrue(e.suppressed().isEmpty())
@@ -164,7 +162,7 @@
* Result: CE(ISE)
*/
thrown.initCause(cancellationCause)
- runCancellation(coroutineContext, cancellationCause, thrown) { e ->
+ runCancellation(cancellationCause, thrown) { e ->
assertSame(thrown, e)
assertSame(cancellationCause, e.cause)
assertEquals(0, thrown.suppressed().size)
@@ -175,7 +173,7 @@
@Test
fun testThrowingCancellation() = runTest {
val thrown = CancellationException()
- runThrowingContinuation(coroutineContext, thrown) { e ->
+ runThrowing(thrown) { e ->
assertSame(thrown, e)
}
}
@@ -184,14 +182,14 @@
fun testThrowingCancellationWithCause() = runTest {
val thrown = CancellationException()
thrown.initCause(IOException())
- runThrowingContinuation(coroutineContext, thrown) { e ->
+ runThrowing(thrown) { e ->
assertSame(thrown, e)
}
}
@Test
fun testCancel() = runTest {
- runOnlyCancellation(coroutineContext, null) { e ->
+ runOnlyCancellation(null) { e ->
assertNull(e.cause)
assertTrue(e.suppressed().isEmpty())
}
@@ -200,7 +198,7 @@
@Test
fun testCancelWithCause() = runTest {
val cause = IOException()
- runOnlyCancellation(coroutineContext, cause) { e ->
+ runOnlyCancellation(cause) { e ->
assertSame(cause, e.cause)
assertTrue(e.suppressed().isEmpty())
}
@@ -209,7 +207,7 @@
@Test
fun testCancelWithCancellationException() = runTest {
val cause = CancellationException()
- runThrowingContinuation(coroutineContext, cause) { e ->
+ runThrowing(cause) { e ->
assertSame(cause, e)
assertNull(e.cause)
assertTrue(e.suppressed().isEmpty())
@@ -225,19 +223,17 @@
}
}
- private suspend inline fun <reified T : Exception> CoroutineScope.runCancellation(
- coroutineContext: CoroutineContext,
+ private suspend inline fun <reified T : Exception> runCancellation(
cancellationCause: Exception?,
- thrownException: T, exceptionChecker: (T) -> Unit
+ thrownException: T,
+ exceptionChecker: (T) -> Unit
) {
-
expect(1)
val job = Job()
job.cancel(cancellationCause)
-
try {
withContext(wrapperDispatcher(coroutineContext) + job, CoroutineStart.ATOMIC) {
- require(isActive)
+ require(!isActive) // already cancelled
expect(2)
throw thrownException
}
@@ -247,15 +243,13 @@
finish(3)
return
}
-
fail()
}
- private suspend inline fun <reified T : Exception> CoroutineScope.runThrowingContinuation(
- coroutineContext: CoroutineContext,
- thrownException: T, exceptionChecker: (T) -> Unit
+ private suspend inline fun <reified T : Exception> runThrowing(
+ thrownException: T,
+ exceptionChecker: (T) -> Unit
) {
-
expect(1)
try {
withContext(wrapperDispatcher(coroutineContext), CoroutineStart.ATOMIC) {
@@ -269,21 +263,19 @@
finish(3)
return
}
-
fail()
}
- private suspend inline fun CoroutineScope.runOnlyCancellation(
- coroutineContext: CoroutineContext,
- cancellationCause: Exception?, exceptionChecker: (CancellationException) -> Unit
+ private suspend inline fun runOnlyCancellation(
+ cancellationCause: Exception?,
+ exceptionChecker: (CancellationException) -> Unit
) {
-
expect(1)
val job = Job()
job.cancel(cancellationCause)
try {
withContext(wrapperDispatcher(coroutineContext) + job, CoroutineStart.ATOMIC) {
- require(isActive)
+ require(!isActive) // is already cancelled
expect(2)
}
} catch (e: Exception) {
@@ -292,7 +284,6 @@
finish(3)
return
}
-
fail()
}
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/test/guide/example-basic-06.kt b/core/kotlinx-coroutines-core/test/guide/example-basic-06.kt
index 8245772..13bdf39 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-basic-06.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-basic-06.kt
@@ -8,7 +8,7 @@
import kotlinx.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
- launch {
+ GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
diff --git a/core/kotlinx-coroutines-core/test/guide/example-cancel-02.kt b/core/kotlinx-coroutines-core/test/guide/example-cancel-02.kt
index 720e36f..f558abe 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-cancel-02.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-cancel-02.kt
@@ -9,7 +9,7 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = timeSource.currentTimeMillis()
- val job = launch {
+ val job = launch(DefaultDispatcher) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
diff --git a/core/kotlinx-coroutines-core/test/guide/example-cancel-03.kt b/core/kotlinx-coroutines-core/test/guide/example-cancel-03.kt
index 91a6326..fe05d70 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-cancel-03.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-cancel-03.kt
@@ -9,7 +9,7 @@
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = timeSource.currentTimeMillis()
- val job = launch {
+ val job = launch(DefaultDispatcher) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
diff --git a/core/kotlinx-coroutines-core/test/guide/example-context-06.kt b/core/kotlinx-coroutines-core/test/guide/example-context-06.kt
index beaf8f0..a97741e 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-context-06.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-context-06.kt
@@ -6,13 +6,12 @@
package kotlinx.coroutines.experimental.guide.context06
import kotlinx.coroutines.experimental.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
- val request = launch {
+ val request = GlobalScope.launch {
// it spawns two other jobs, one with its separate context
- val job1 = launch {
+ val job1 = GlobalScope.launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
diff --git a/core/kotlinx-coroutines-core/test/guide/example-context-09.kt b/core/kotlinx-coroutines-core/test/guide/example-context-09.kt
index 1620dc2..0c16a3b 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-context-09.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-context-09.kt
@@ -12,12 +12,12 @@
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
- val v1 = async(CoroutineName("v1coroutine")) {
+ val v1 = async(CoroutineName("v1coroutine") + DefaultDispatcher) {
delay(500)
log("Computing v1")
252
}
- val v2 = async(CoroutineName("v2coroutine")) {
+ val v2 = async(CoroutineName("v2coroutine") + DefaultDispatcher) {
delay(1000)
log("Computing v2")
6
diff --git a/core/kotlinx-coroutines-core/test/guide/example-exceptions-01.kt b/core/kotlinx-coroutines-core/test/guide/example-exceptions-01.kt
index 81bf496..d0822a0 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-exceptions-01.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-exceptions-01.kt
@@ -8,13 +8,13 @@
import kotlinx.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking {
- val job = launch {
+ val job = GlobalScope.launch {
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
- val deferred = async {
+ val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
diff --git a/core/kotlinx-coroutines-core/test/guide/example-exceptions-02.kt b/core/kotlinx-coroutines-core/test/guide/example-exceptions-02.kt
index acd1b92..90a97db 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-exceptions-02.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-exceptions-02.kt
@@ -11,10 +11,10 @@
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
- val job = launch(handler) {
+ val job = GlobalScope.launch(handler) {
throw AssertionError()
}
- val deferred = async(handler) {
+ val deferred = GlobalScope.async(handler) {
throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
}
joinAll(job, deferred)
diff --git a/core/kotlinx-coroutines-core/test/guide/example-exceptions-04.kt b/core/kotlinx-coroutines-core/test/guide/example-exceptions-04.kt
index 02277e2..16faec7 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-exceptions-04.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-exceptions-04.kt
@@ -6,13 +6,12 @@
package kotlinx.coroutines.experimental.guide.exceptions04
import kotlinx.coroutines.experimental.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
- val job = launch(handler) {
+ val job = GlobalScope.launch(handler) {
val child1 = launch(coroutineContext, start = CoroutineStart.ATOMIC) {
try {
delay(Long.MAX_VALUE)
diff --git a/core/kotlinx-coroutines-core/test/guide/example-exceptions-06.kt b/core/kotlinx-coroutines-core/test/guide/example-exceptions-06.kt
index e70443b..0cbcd34 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-exceptions-06.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-exceptions-06.kt
@@ -6,14 +6,13 @@
package kotlinx.coroutines.experimental.guide.exceptions06
import kotlinx.coroutines.experimental.*
-import kotlin.coroutines.experimental.*
import java.io.*
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught original $exception")
}
- val job = launch(handler) {
+ val job = GlobalScope.launch(handler) {
val inner = launch(coroutineContext) {
launch(coroutineContext) {
launch(coroutineContext) {
diff --git a/core/kotlinx-coroutines-core/test/guide/example-select-05.kt b/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
index fa7ae7a..b62c567 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-select-05.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
-import kotlin.coroutines.experimental.*
fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
diff --git a/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt b/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
index 5193773..39d2284 100644
--- a/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
+++ b/core/kotlinx-coroutines-core/test/guide/example-sync-04.kt
@@ -6,15 +6,15 @@
package kotlinx.coroutines.experimental.guide.sync04
import kotlinx.coroutines.experimental.*
-import kotlin.system.*
import kotlin.coroutines.experimental.*
+import kotlin.system.*
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
- launch(context) {
+ launch(context) {
repeat(k) { action() }
}
}
diff --git a/integration/kotlinx-coroutines-slf4j/test/MDCContextTest.kt b/integration/kotlinx-coroutines-slf4j/test/MDCContextTest.kt
index 9838871..9073625 100644
--- a/integration/kotlinx-coroutines-slf4j/test/MDCContextTest.kt
+++ b/integration/kotlinx-coroutines-slf4j/test/MDCContextTest.kt
@@ -26,7 +26,7 @@
fun testContextIsNotPassedByDefaultBetweenCoroutines() = runTest {
expect(1)
MDC.put("myKey", "myValue")
- launch {
+ GlobalScope.launch {
assertEquals(null, MDC.get("myKey"))
expect(2)
}.join()
diff --git a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
index fa03d75..c6803e5 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
@@ -9,15 +9,14 @@
import kotlinx.coroutines.experimental.reactive.*
import org.junit.*
import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
class ConvertTest : TestBase() {
class TestException(s: String): RuntimeException(s)
@Test
- fun testJobToMonoSuccess() = runBlocking<Unit> {
+ fun testJobToMonoSuccess() = runBlocking {
expect(1)
- val job = launch(coroutineContext) {
+ val job = launch {
expect(3)
}
val mono = job.asMono(coroutineContext)
@@ -30,9 +29,9 @@
}
@Test
- fun testJobToMonoFail() = runBlocking<Unit> {
+ fun testJobToMonoFail() = runBlocking {
expect(1)
- val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
+ val job = async(NonCancellable) { // don't kill parent on exception
expect(3)
throw RuntimeException("OK")
}
@@ -76,7 +75,7 @@
@Test
fun testDeferredToMonoFail() {
- val d = async {
+ val d = GlobalScope.async {
delay(50)
throw TestException("OK")
}
@@ -92,7 +91,7 @@
@Test
fun testToFlux() {
- val c = produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
@@ -106,7 +105,7 @@
@Test
fun testToFluxFail() {
- val c = produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt
index 63e392b..6dfae53 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt
@@ -5,18 +5,16 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package kotlinx.coroutines.experimental.rx2.guide.basic07
-import io.reactivex.subjects.BehaviorSubject
-import kotlinx.coroutines.experimental.Unconfined
-import kotlinx.coroutines.experimental.launch
-import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.rx2.consumeEach
+import io.reactivex.subjects.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.rx2.*
fun main(args: Array<String>) = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print everything
- launch(Unconfined) { // launch coroutine in unconfined context
+ GlobalScope.launch(Unconfined) { // launch coroutine in unconfined context
subject.consumeEach { println(it) }
}
subject.onNext("three")