Reactive scopeless (#1341)
Make all reactive builders top-level functions instead of extensions on CoroutineScope and prohibit jobs in their context
Downsides of having lifecycle-managed scoped builders:
* The lifecycle of semantically cold entity is managed externally by the hot-one.
* Independent failures in independent triggered computations affect each other
* Two cancellation sources should be managed, coroutine-related Job parent and disposable/subscription
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 484f385..0dda7e5 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -437,6 +437,7 @@
public fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
public fun start ()Z
+ public fun toString ()Ljava/lang/String;
}
public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/ChildHandle, kotlinx/coroutines/DisposableHandle {
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
index 2afa313..43aec89 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
@@ -20,8 +20,11 @@
}
public final class kotlinx/coroutines/reactive/PublishKt {
+ public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
+ public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
+ public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
}
public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
index 8afd014..6534bfb 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
@@ -6,12 +6,16 @@
}
public final class kotlinx/coroutines/reactor/FluxKt {
+ public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
+ public static synthetic fun flux$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
public static synthetic fun flux$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
}
public final class kotlinx/coroutines/reactor/MonoKt {
+ public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static final fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
+ public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
}
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
index 67ef8a1..54a9663 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt
@@ -21,7 +21,9 @@
}
public final class kotlinx/coroutines/rx2/RxCompletableKt {
+ public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
public static final fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
+ public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
public static synthetic fun rxCompletable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
}
@@ -35,17 +37,23 @@
}
public final class kotlinx/coroutines/rx2/RxFlowableKt {
+ public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
public static final fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
+ public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun rxFlowable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
}
public final class kotlinx/coroutines/rx2/RxMaybeKt {
+ public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
public static final fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
+ public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
public static synthetic fun rxMaybe$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
}
public final class kotlinx/coroutines/rx2/RxObservableKt {
+ public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
public static final fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
+ public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
public static synthetic fun rxObservable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
}
@@ -54,7 +62,9 @@
}
public final class kotlinx/coroutines/rx2/RxSingleKt {
+ public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
public static final fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
+ public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
public static synthetic fun rxSingle$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
}
diff --git a/kotlinx-coroutines-core/common/src/NonCancellable.kt b/kotlinx-coroutines-core/common/src/NonCancellable.kt
index 3a4faee..c48faea 100644
--- a/kotlinx-coroutines-core/common/src/NonCancellable.kt
+++ b/kotlinx-coroutines-core/common/src/NonCancellable.kt
@@ -115,4 +115,9 @@
*/
@InternalCoroutinesApi
override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle
+
+ /** @suppress */
+ override fun toString(): String {
+ return "NonCancellable"
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt
index 0a10913..073c7a5 100644
--- a/kotlinx-coroutines-core/jvm/test/TestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt
@@ -9,6 +9,7 @@
import org.junit.*
import java.util.*
import java.util.concurrent.atomic.*
+import kotlin.coroutines.*
import kotlin.test.*
private val VERBOSE = systemProp("test.verbose", false)
@@ -213,4 +214,6 @@
assertTrue(result.exceptionOrNull() is T, "Expected ${T::class}, but had $result")
return result.exceptionOrNull()!! as T
}
+
+ protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!!
}
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index 1da4250..0eff27b 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -617,7 +617,7 @@
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
-) = GlobalScope.publish<R>(context) {
+) = publish<R>(context) {
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
@@ -638,7 +638,7 @@
```kotlin
fun main() = runBlocking<Unit> {
range(1, 5)
- .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
+ .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // print all the resulting strings
}
```
@@ -673,7 +673,7 @@
-->
```kotlin
-fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
@@ -711,7 +711,7 @@
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
- slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it
+ slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
}
```
@@ -742,7 +742,7 @@
-->
```kotlin
-fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
@@ -783,7 +783,7 @@
```kotlin
fun main() = runBlocking<Unit> {
- testPub().merge(coroutineContext).collect { println(it) } // print the whole stream
+ testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
}
```
@@ -865,7 +865,7 @@
-->
```kotlin
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -915,7 +915,7 @@
-->
```kotlin
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -1067,12 +1067,12 @@
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
-[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
+[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
-[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-reactive/README.md b/reactive/kotlinx-coroutines-reactive/README.md
index d7746e1..69691e8 100644
--- a/reactive/kotlinx-coroutines-reactive/README.md
+++ b/reactive/kotlinx-coroutines-reactive/README.md
@@ -33,7 +33,7 @@
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
-[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
+[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first.html
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-default.html
[org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-else.html
diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt
index 2be24af..a7ae128 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt
@@ -21,7 +21,7 @@
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
@ObsoleteCoroutinesApi
-public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = GlobalScope.publish(context) {
+public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
for (t in this@asPublisher)
send(t)
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index f5ea01e..843c94c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -1,7 +1,9 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.reactive
import kotlinx.atomicfu.*
@@ -11,6 +13,7 @@
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
+import kotlin.internal.LowPriorityInOverloadResolution
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
@@ -26,25 +29,44 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
*/
@ExperimentalCoroutinesApi
+public fun <T> publish(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> {
+ require(context[Job] === null) { "Publisher context cannot contain job in it." +
+ "Its lifecycle should be managed via subscription. Had $context" }
+ return publishInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.publish is deprecated in favour of top-level publish",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("publish(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
+@LowPriorityInOverloadResolution
public fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> = publishInternal(this, context, block)
+
+/** @suppress For internal use from other reactive integration modules only */
+@InternalCoroutinesApi
+public fun <T> publishInternal(
+ scope: CoroutineScope, // support for legacy publish in scope
+ context: CoroutineContext,
+ block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
index ca18349..aaeaa00 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
@@ -20,7 +20,7 @@
) : TestBase() {
enum class Ctx {
- MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context },
+ MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
@@ -39,7 +39,7 @@
@Test
fun testEmpty(): Unit = runBlocking {
- val pub = CoroutineScope(ctx(coroutineContext)).publish<String> {
+ val pub = publish<String>(ctx(coroutineContext)) {
if (delay) delay(1)
// does not send anything
}
@@ -77,7 +77,7 @@
@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
- val pub = CoroutineScope(ctx(coroutineContext)).publish {
+ val pub = publish(ctx(coroutineContext)) {
for (i in 1..n) {
send(i)
if (delay) delay(1)
@@ -99,8 +99,7 @@
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
- yield()
- expectUnreached()
+ hang {}
}.awaitFirst()
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt
deleted file mode 100644
index 5936712..0000000
--- a/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.reactive
-
-import kotlinx.coroutines.*
-import org.junit.*
-import org.junit.Test
-import org.reactivestreams.*
-import java.util.concurrent.*
-import kotlin.test.*
-
-public class PublishParentCancelStressTest : TestBase() {
- private val dispatcher = newFixedThreadPoolContext(3, "PublishParentCancelStressTest")
- private val N_TIMES = 5000 * stressTestMultiplier
-
- @After
- fun tearDown() {
- dispatcher.close()
- }
-
- @Test
- fun testStress() = runTest {
- var unhandled: Throwable? = null
- val handler = CoroutineExceptionHandler { _, ex -> unhandled = ex }
- repeat(N_TIMES) {
- val barrier = CyclicBarrier(4)
- // launch parent job for publisher
- val parent = GlobalScope.async<Unit>(dispatcher + handler) {
- val publisher = publish<Unit> {
- // BARRIER #1 - child publisher crashes
- barrier.await()
- throw TestException()
- }
- var sub: Subscription? = null
- publisher.subscribe(object : Subscriber<Unit> {
- override fun onComplete() { error("Cannot be reached") }
- override fun onSubscribe(s: Subscription?) { sub = s }
- override fun onNext(t: Unit?) { error("Cannot be reached" ) }
- override fun onError(t: Throwable?) {
- assertTrue(t is TestException)
- }
- })
- launch {
- // BARRIER #3 -- cancel subscription
- barrier.await()
- sub!!.cancel()
- }
- // BARRIER #2 -- parent completes
- barrier.await()
- Unit
- }
- // BARRIE #4 - go 1-3 together
- barrier.await()
- // Make sure exception is not lost, but incorporated into parent
- val result = kotlin.runCatching { parent.await() }
- assertTrue(result.exceptionOrNull() is TestException)
- // Make sure unhandled exception handler was not invoked
- assertNull(unhandled)
- }
- }
-
- private class TestException : Exception()
-}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
index e022ff1..4ffa074 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
@@ -14,7 +14,7 @@
@Test
fun testBasicEmpty() = runTest {
expect(1)
- val publisher = publish<Int> {
+ val publisher = publish<Int>(currentDispatcher()) {
expect(5)
}
expect(2)
@@ -32,7 +32,7 @@
@Test
fun testBasicSingle() = runTest {
expect(1)
- val publisher = publish {
+ val publisher = publish(currentDispatcher()) {
expect(5)
send(42)
expect(7)
@@ -58,7 +58,7 @@
@Test
fun testBasicError() = runTest {
expect(1)
- val publisher = publish<Int>(NonCancellable) {
+ val publisher = publish<Int>(currentDispatcher()) {
expect(5)
throw RuntimeException("OK")
}
@@ -82,23 +82,14 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- publish<Unit> {
- throw RuntimeException("OK")
- }.openSubscription()
- }
-
- @Test
- fun testHandleFailureAfterCancel() = runTest(
- unhandled = listOf({ it -> it is RuntimeException && it.message == "FAILED" })
- ){
+ fun testHandleFailureAfterCancel() = runTest {
expect(1)
- // Exception should be delivered to CoroutineExceptionHandler, because we create publisher
- // with the NonCancellable parent
- val publisher = publish<Unit>(NonCancellable + Dispatchers.Unconfined) {
+
+ val eh = CoroutineExceptionHandler { _, t ->
+ assertTrue(t is RuntimeException)
+ expect(6)
+ }
+ val publisher = publish<Unit>(Dispatchers.Unconfined + eh) {
try {
expect(3)
delay(10000)
@@ -128,95 +119,13 @@
})
expect(4)
sub!!.cancel()
- finish(6)
- }
-
- @Test
- fun testParentHandlesFailure() = runTest {
- expect(1)
- val deferred = CompletableDeferred<Unit>()
- val publisher = publish<Unit>(deferred + Dispatchers.Unconfined) {
- try {
- expect(3)
- delay(10000)
- } finally {
- expect(5)
- throw TestException("FAILED")
- }
- }
- var sub: Subscription? = null
- publisher.subscribe(object : Subscriber<Unit> {
- override fun onComplete() {
- expectUnreached()
- }
-
- override fun onSubscribe(s: Subscription) {
- expect(2)
- sub = s
- }
-
- override fun onNext(t: Unit?) {
- expectUnreached()
- }
-
- override fun onError(t: Throwable?) {
- expectUnreached()
- }
- })
- expect(4)
- sub!!.cancel()
-
- try {
- deferred.await()
- expectUnreached()
- } catch (e: TestException) {
- expect(6)
- }
-
finish(7)
}
@Test
- fun testPublishFailureCancelsParent() = runTest(
- expected = { it is TestException }
- ) {
- expect(1)
- val publisher = publish<Unit> {
- expect(5)
- throw TestException()
- }
- expect(2)
- publisher.subscribe(object : Subscriber<Unit> {
- override fun onComplete() {
- expectUnreached()
- }
-
- override fun onSubscribe(s: Subscription) {
- expect(3)
- }
-
- override fun onNext(t: Unit?) {
- expectUnreached()
- }
-
- override fun onError(t: Throwable?) {
- assertTrue(t is TestException)
- expect(6)
- }
- })
- expect(4)
- try {
- yield() // to coroutine, will crash because it is a cancelled parent coroutine
- } finally {
- finish(7)
- }
- expectUnreached()
- }
-
- @Test
fun testOnNextError() = runTest {
expect(1)
- val publisher = publish<String>(NonCancellable) {
+ val publisher = publish(currentDispatcher()) {
expect(4)
try {
send("OK")
@@ -255,7 +164,7 @@
@Test
fun testFailingConsumer() = runTest {
- val pub = publish {
+ val pub = publish(currentDispatcher()) {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
@@ -269,4 +178,9 @@
finish(3)
}
}
+
+ @Test
+ fun testIllegalArgumentException() {
+ assertFailsWith<IllegalArgumentException> { publish<Int>(Job()) { } }
+ }
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
index 503a015..258632b 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
@@ -12,7 +12,7 @@
@Test
fun testCancelWhileBPSuspended() = runBlocking {
expect(1)
- val observable = publish {
+ val observable = publish(currentDispatcher()) {
expect(5)
send("A") // will not suspend, because an item was requested
expect(7)
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
index cac2f55..e238d39 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
@@ -13,7 +13,7 @@
@Test
fun testConcurrentStress() = runBlocking {
val n = 10_000 * stressTestMultiplier
- val observable = GlobalScope.publish {
+ val observable = publish {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
diff --git a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt
index 2e55b8a..6816a98 100644
--- a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt
@@ -26,29 +26,17 @@
private val dispatcher: Dispatcher
) : PublisherVerification<Long>(TestEnvironment(500, 500)) {
- private val scope = CoroutineScope(dispatcher.dispatcher + NonCancellable)
-
override fun createPublisher(elements: Long): Publisher<Long> =
- scope.publish {
+ publish(dispatcher.dispatcher) {
for (i in 1..elements) send(i)
}
override fun createFailedPublisher(): Publisher<Long> =
- scope.publish {
+ publish(dispatcher.dispatcher) {
throw TestException()
}
@Test
- public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() {
- // This test fails on default dispatcher because it retains a reference to the last task
- // in the structure of its GlobalQueue
- // So we skip it with the default dispatcher.
- // todo: remove it when CoroutinesScheduler is improved
- if (dispatcher == Dispatcher.DEFAULT) return
- super.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
- }
-
- @Test
public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() {
throw SkipException("Skipped")
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
index 74f5914..3f33b33 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
@@ -17,7 +17,7 @@
var onCancelled = 0
var onError = 0
- val publisher = publish {
+ val publisher = publish(currentDispatcher()) {
coroutineContext[Job]?.invokeOnCompletion {
if (it is CancellationException) ++onCancelled
}
@@ -45,7 +45,7 @@
@Test
fun testBufferSize1() = runTest {
- val publisher = publish {
+ val publisher = publish(currentDispatcher()) {
expect(1)
send(3)
@@ -66,7 +66,7 @@
@Test
fun testBufferSize10() = runTest {
- val publisher = publish {
+ val publisher = publish(currentDispatcher()) {
expect(1)
send(5)
@@ -87,7 +87,7 @@
@Test
fun testConflated() = runTest {
- val publisher = publish {
+ val publisher = publish(currentDispatcher()) {
for (i in 1..5) send(i)
}
val list = publisher.asFlow().conflate().toList()
@@ -96,7 +96,7 @@
@Test
fun testProduce() = runTest {
- val flow = publish { repeat(10) { send(it) } }.asFlow()
+ val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow()
check((0..9).toList(), flow.produceIn(this))
check((0..9).toList(), flow.buffer(2).produceIn(this))
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
@@ -113,7 +113,7 @@
fun testProduceCancellation() = runTest {
expect(1)
// publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
- val flow = publish {
+ val flow = publish(currentDispatcher()) {
expect(3)
repeat(10) { value ->
when (value) {
diff --git a/reactive/kotlinx-coroutines-reactor/README.md b/reactive/kotlinx-coroutines-reactor/README.md
index 1a08834..1531488 100644
--- a/reactive/kotlinx-coroutines-reactor/README.md
+++ b/reactive/kotlinx-coroutines-reactor/README.md
@@ -29,8 +29,8 @@
<!--- INDEX kotlinx.coroutines.channels -->
<!--- MODULE kotlinx-coroutines-reactor -->
<!--- INDEX kotlinx.coroutines.reactor -->
-[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-coroutine-scope/mono.html
-[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-coroutine-scope/flux.html
+[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/mono.html
+[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html
[kotlinx.coroutines.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-job/as-mono.html
[kotlinx.coroutines.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-deferred/as-mono.html
[kotlinx.coroutines.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.channels.-receive-channel/as-flux.html
diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt
index ea9f882..cf6b65d 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt
@@ -22,7 +22,7 @@
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
@ExperimentalCoroutinesApi
-public fun Job.asMono(context: CoroutineContext): Mono<Unit> = GlobalScope.mono(context) { this@asMono.join() }
+public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
/**
* Converts this deferred value to the hot reactive mono that signals
* [success][MonoSink.success] or [error][MonoSink.error].
@@ -36,7 +36,7 @@
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
@ExperimentalCoroutinesApi
-public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = GlobalScope.mono(context) { this@asMono.await() }
+public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
/**
* Converts a stream of elements received from the channel to the hot reactive flux.
@@ -50,7 +50,7 @@
* @param context -- the coroutine context from which the resulting flux is going to be signalled
*/
@ObsoleteCoroutinesApi
-public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = GlobalScope.flux(context) {
+public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> = flux(context) {
for (t in this@asFlux)
send(t)
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 3495501..785b465 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,7 +1,9 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
@@ -9,16 +11,15 @@
import kotlinx.coroutines.reactive.*
import reactor.core.publisher.*
import kotlin.coroutines.*
+import kotlin.internal.LowPriorityInOverloadResolution
/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
@@ -29,12 +30,29 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ *
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*/
@ExperimentalCoroutinesApi
-fun <T> CoroutineScope.flux(
+public fun <T> flux(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flux<T> {
+ require(context[Job] === null) { "Flux context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return Flux.from(publishInternal(GlobalScope, context, block))
+}
+
+@Deprecated(
+ message = "CoroutineScope.flux is deprecated in favour of top-level flux",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("flux(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
+@LowPriorityInOverloadResolution
+public fun <T> CoroutineScope.flux(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> =
- Flux.from(publish(newCoroutineContext(context), block = block))
+ Flux.from(publishInternal(this, context, block))
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index 7174fb6..a0f65af 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -1,13 +1,16 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [mono][Mono] that will run a given [block] in a coroutine.
@@ -20,19 +23,37 @@
* | Returns a null | `success`
* | Failure with exception or unsubscribe | `error`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine.
- * @param block the coroutine code.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
-fun <T> CoroutineScope.mono(
+public fun <T> mono(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
+): Mono<T> {
+ require(context[Job] === null) { "Mono context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return monoInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.mono is deprecated in favour of top-level mono",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("mono(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
+public fun <T> CoroutineScope.mono(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Mono<T> = monoInternal(this, context, block)
+
+private fun <T> monoInternal(
+ scope: CoroutineScope, // support for legacy mono in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
index 9bd55cd..10e05b7 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
@@ -17,7 +17,7 @@
val job = launch {
expect(3)
}
- val mono = job.asMono(coroutineContext)
+ val mono = job.asMono(coroutineContext.minusKey(Job))
mono.subscribe {
expect(4)
}
@@ -29,11 +29,11 @@
@Test
fun testJobToMonoFail() = runBlocking {
expect(1)
- val job = async(NonCancellable) { // don't kill parent on exception
+ val job = async(NonCancellable) {
expect(3)
throw RuntimeException("OK")
}
- val mono = job.asMono(coroutineContext + NonCancellable)
+ val mono = job.asMono(coroutineContext.minusKey(Job))
mono.subscribe(
{ fail("no item should be emitted") },
{ expect(4) }
@@ -110,10 +110,10 @@
throw TestException("K")
}
val flux = c.asFlux(Dispatchers.Unconfined)
- val mono = GlobalScope.mono(Dispatchers.Unconfined) {
+ val mono = mono(Dispatchers.Unconfined) {
var result = ""
try {
- flux.consumeEach { result += it }
+ flux.collect { result += it }
} catch(e: Throwable) {
check(e is TestException)
result += e.message
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
index c4c0dbc..ae23d3c 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
@@ -15,7 +15,7 @@
@Test
fun testNumbers() {
val n = 100 * stressTestMultiplier
- val flux = GlobalScope.flux {
+ val flux = flux {
repeat(n) { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -26,7 +26,7 @@
@Test
fun testConcurrentStress() {
val n = 10_000 * stressTestMultiplier
- val flux = GlobalScope.flux {
+ val flux = flux {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
@@ -45,7 +45,7 @@
@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
- val flux = GlobalScope.flux(Dispatchers.Unconfined) {
+ val flux = flux(Dispatchers.Unconfined) {
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -56,7 +56,7 @@
@Test
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
- val flux = GlobalScope.flux {
+ val flux = flux {
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -66,11 +66,11 @@
@Test
fun testSendAndCrash() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send("O")
throw IOException("K")
}
- val mono = GlobalScope.mono {
+ val mono = mono {
var result = ""
try {
flux.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
index 241cc6a..7d8d469 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
@@ -14,7 +14,7 @@
class FluxSingleTest {
@Test
fun testSingleNoWait() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send("OK")
}
@@ -30,7 +30,7 @@
@Test
fun testSingleEmitAndAwait() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O").awaitSingle() + "K")
}
@@ -41,7 +41,7 @@
@Test
fun testSingleWithDelay() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
}
@@ -52,7 +52,7 @@
@Test
fun testSingleException() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O", "K").awaitSingle() + "K")
}
@@ -63,7 +63,7 @@
@Test
fun testAwaitFirst() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O", "#").awaitFirst() + "K")
}
@@ -74,7 +74,7 @@
@Test
fun testAwaitFirstOrDefault() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.empty<String>().awaitFirstOrDefault("O") + "K")
}
@@ -85,7 +85,7 @@
@Test
fun testAwaitFirstOrDefaultWithValues() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K")
}
@@ -96,7 +96,7 @@
@Test
fun testAwaitFirstOrNull() {
- val flux = GlobalScope.flux<String> {
+ val flux = flux<String?> {
send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -107,7 +107,7 @@
@Test
fun testAwaitFirstOrNullWithValues() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
}
@@ -118,7 +118,7 @@
@Test
fun testAwaitFirstOrElse() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
}
@@ -129,7 +129,7 @@
@Test
fun testAwaitFirstOrElseWithValues() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
}
@@ -140,7 +140,7 @@
@Test
fun testAwaitLast() {
- val flux = GlobalScope.flux {
+ val flux = flux {
send(Flux.just("#", "O").awaitLast() + "K")
}
@@ -151,7 +151,7 @@
@Test
fun testExceptionFromObservable() {
- val flux = GlobalScope.flux {
+ val flux = flux {
try {
send(Flux.error<String>(RuntimeException("O")).awaitFirst())
} catch (e: RuntimeException) {
@@ -166,7 +166,7 @@
@Test
fun testExceptionFromCoroutine() {
- val flux = GlobalScope.flux<String> {
+ val flux = flux<String> {
error(Flux.just("O").awaitSingle() + "K")
}
@@ -178,7 +178,7 @@
@Test
fun testFluxIteration() {
- val flux = GlobalScope.flux {
+ val flux = flux {
var result = ""
Flux.just("O", "K").collect { result += it }
send(result)
@@ -191,7 +191,7 @@
@Test
fun testFluxIterationFailure() {
- val flux = GlobalScope.flux {
+ val flux = flux {
try {
Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
send("Fail")
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
index a0368f8..ee26455 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
@@ -15,7 +15,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val flux = flux {
+ val flux = flux(currentDispatcher()) {
expect(4)
send("OK")
}
@@ -32,7 +32,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val flux = flux<String>(NonCancellable) {
+ val flux = flux<String>(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -52,7 +52,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val flux = flux<String> {
+ val flux = flux<String>(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -72,23 +72,10 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- flux<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testNotifyOnceOnCancellation() = runTest {
expect(1)
val observable =
- flux {
+ flux(currentDispatcher()) {
expect(5)
send("OK")
try {
@@ -124,7 +111,7 @@
@Test
fun testFailingConsumer() = runTest {
- val pub = flux {
+ val pub = flux(currentDispatcher()) {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
@@ -138,4 +125,9 @@
finish(3)
}
}
+
+ @Test
+ fun testIllegalArgumentException() {
+ assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } }
+ }
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
index 7c72edc..2283d45 100644
--- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
@@ -22,14 +22,14 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val mono = mono {
+ val mono = mono(currentDispatcher()) {
expect(4)
"OK"
}
expect(2)
mono.subscribe { value ->
expect(5)
- Assert.assertThat(value, IsEqual("OK"))
+ assertThat(value, IsEqual("OK"))
}
expect(3)
yield() // to started coroutine
@@ -39,7 +39,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val mono = mono(NonCancellable) {
+ val mono = mono(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -48,8 +48,8 @@
expectUnreached()
}, { error ->
expect(5)
- Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
- Assert.assertThat(error.message, IsEqual("OK"))
+ assertThat(error, IsInstanceOf(RuntimeException::class.java))
+ assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to started coroutine
@@ -59,7 +59,7 @@
@Test
fun testBasicEmpty() = runBlocking {
expect(1)
- val mono = mono {
+ val mono = mono(currentDispatcher()) {
expect(4)
null
}
@@ -75,7 +75,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val mono = mono {
+ val mono = mono(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -97,7 +97,7 @@
@Test
fun testMonoNoWait() {
- val mono = GlobalScope.mono {
+ val mono = mono {
"OK"
}
@@ -113,7 +113,7 @@
@Test
fun testMonoEmitAndAwait() {
- val mono = GlobalScope.mono {
+ val mono = mono {
Mono.just("O").awaitSingle() + "K"
}
@@ -124,7 +124,7 @@
@Test
fun testMonoWithDelay() {
- val mono = GlobalScope.mono {
+ val mono = mono {
Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
}
@@ -135,7 +135,7 @@
@Test
fun testMonoException() {
- val mono = GlobalScope.mono {
+ val mono = mono {
Flux.just("O", "K").awaitSingle() + "K"
}
@@ -146,7 +146,7 @@
@Test
fun testAwaitFirst() {
- val mono = GlobalScope.mono {
+ val mono = mono {
Flux.just("O", "#").awaitFirst() + "K"
}
@@ -157,7 +157,7 @@
@Test
fun testAwaitLast() {
- val mono = GlobalScope.mono {
+ val mono = mono {
Flux.just("#", "O").awaitLast() + "K"
}
@@ -168,7 +168,7 @@
@Test
fun testExceptionFromFlux() {
- val mono = GlobalScope.mono {
+ val mono = mono {
try {
Flux.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -183,7 +183,7 @@
@Test
fun testExceptionFromCoroutine() {
- val mono = GlobalScope.mono<String> {
+ val mono = mono<String> {
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
}
@@ -194,21 +194,8 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- mono<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testSuppressedException() = runTest {
- val mono = mono(NonCancellable) {
+ val mono = mono(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
@@ -227,12 +214,14 @@
}
@Test
- fun testUnhandledException() = runTest(
- unhandled = listOf { it -> it is TestException }
- ) {
+ fun testUnhandledException() = runTest {
expect(1)
var subscription: Subscription? = null
- val mono = mono(NonCancellable) {
+ val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t ->
+ assertTrue(t is TestException)
+ expect(5)
+
+ }) {
expect(4)
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
try {
@@ -252,6 +241,11 @@
})
expect(3)
yield() // run coroutine
- finish(5)
+ finish(6)
+ }
+
+ @Test
+ fun testIllegalArgumentException() {
+ assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index f1079b6..fbdf1b3 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -51,11 +51,11 @@
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
-[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-completable.html
-[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-maybe.html
-[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-single.html
-[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-observable.html
-[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html
+[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-maybe.html
+[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html
+[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
index f44ecf7..61046f2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
@@ -1,12 +1,15 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [Completable] that runs a given [block] in a coroutine.
@@ -18,19 +21,36 @@
* | Completes successfully | `onCompleted`
* | Failure with exception or unsubscribe | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
+public fun rxCompletable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> Unit
+): Completable {
+ require(context[Job] === null) { "Completable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxCompletableInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("rxCompletable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
public fun CoroutineScope.rxCompletable(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
+): Completable = rxCompletableInternal(this, context, block)
+
+private fun rxCompletableInternal(
+ scope: CoroutineScope, // support for legacy rxCompletable in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = RxCompletableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index dbf29f1..d5678de 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -25,7 +25,7 @@
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
@ExperimentalCoroutinesApi
-public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
+public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
@@ -42,7 +42,7 @@
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
@ExperimentalCoroutinesApi
-public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = GlobalScope.rxMaybe(context) {
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
@@ -59,7 +59,7 @@
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
@ExperimentalCoroutinesApi
-public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
+public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
@@ -75,7 +75,7 @@
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
@ObsoleteCoroutinesApi
-public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
+public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
for (t in this@asObservable)
send(t)
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
index 93d6079..beee40e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
@@ -1,7 +1,9 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.rx2
import io.reactivex.*
@@ -9,6 +11,7 @@
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
@@ -24,19 +27,29 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
- * to cancellation and error handling may change in the future.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
*/
@ExperimentalCoroutinesApi
+public fun <T: Any> rxFlowable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flowable<T> {
+ require(context[Job] === null) { "Flowable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return Flowable.fromPublisher(publishInternal(GlobalScope, context, block))
+}
+
+@Deprecated(
+ message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("rxFlowable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
public fun <T: Any> CoroutineScope.rxFlowable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Flowable<T> = Flowable.fromPublisher(publish(newCoroutineContext(context), block = block))
+): Flowable<T> = Flowable.fromPublisher(publishInternal(this, context, block))
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
index 3e3f13b..e93ae6b 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
@@ -1,12 +1,15 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
@@ -19,19 +22,36 @@
* | Returns a null | `onComplete`
* | Failure with exception or unsubscribe | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
+public fun <T> rxMaybe(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Maybe<T> {
+ require(context[Job] === null) { "Maybe context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxMaybeInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("rxMaybe(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
public fun <T> CoroutineScope.rxMaybe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
+): Maybe<T> = rxMaybeInternal(this, context, block)
+
+private fun <T> rxMaybeInternal(
+ scope: CoroutineScope, // support for legacy rxMaybe in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index f78f5ea..35176f1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -1,7 +1,9 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.rx2
import io.reactivex.*
@@ -11,6 +13,7 @@
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -26,23 +29,37 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
- *
- * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
- * to cancellation and error handling may change in the future.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi
+public fun <T : Any> rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> {
+ require(context[Job] === null) { "Observable context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxObservableInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("rxObservable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
public fun <T : Any> CoroutineScope.rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> = rxObservableInternal(this, context, block)
+
+private fun <T : Any> rxObservableInternal(
+ scope: CoroutineScope, // support for legacy rxObservable in scope
+ context: CoroutineContext,
+ block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = RxObservableCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index 53992d4..e382bbe 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -1,12 +1,15 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
+import kotlin.internal.*
/**
* Creates cold [single][Single] that will run a given [block] in a coroutine.
@@ -18,19 +21,36 @@
* | Returns a value | `onSuccess`
* | Failure with exception or unsubscribe | `onError`
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
- *
- * @param context context of the coroutine.
- * @param block the coroutine code.
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
+public fun <T : Any> rxSingle(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T
+): Single<T> {
+ require(context[Job] === null) { "Single context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return rxSingleInternal(GlobalScope, context, block)
+}
+
+@Deprecated(
+ message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("rxSingle(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+@LowPriorityInOverloadResolution
public fun <T : Any> CoroutineScope.rxSingle(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
+): Single<T> = rxSingleInternal(this, context, block)
+
+private fun <T : Any> rxSingleInternal(
+ scope: CoroutineScope, // support for legacy rxSingle in scope
+ context: CoroutineContext,
+ block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
- val newContext = newCoroutineContext(context)
+ val newContext = scope.newCoroutineContext(context)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.setCancellable(RxCancellable(coroutine))
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
index a11807c..fd15964 100644
--- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
@@ -15,7 +15,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable {
+ val completable = rxCompletable(currentDispatcher()) {
expect(4)
}
expect(2)
@@ -30,7 +30,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(NonCancellable) {
+ val completable = rxCompletable(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -50,7 +50,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val completable = rxCompletable {
+ val completable = rxCompletable(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -73,7 +73,7 @@
@Test
fun testAwaitSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable {
+ val completable = rxCompletable(currentDispatcher()) {
expect(3)
}
expect(2)
@@ -84,7 +84,7 @@
@Test
fun testAwaitFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(NonCancellable) {
+ val completable = rxCompletable(currentDispatcher()) {
expect(3)
throw RuntimeException("OK")
}
@@ -99,21 +99,8 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- rxCompletable {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testSuppressedException() = runTest {
- val completable = rxCompletable(NonCancellable) {
+ val completable = rxCompletable(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
@@ -132,12 +119,14 @@
}
@Test
- fun testUnhandledException() = runTest(
- unhandled = listOf { it -> it is TestException }
- ) {
+ fun testUnhandledException() = runTest() {
expect(1)
var disposable: Disposable? = null
- val completable = rxCompletable(NonCancellable) {
+ val eh = CoroutineExceptionHandler { _, t ->
+ assertTrue(t is TestException)
+ expect(5)
+ }
+ val completable = rxCompletable(currentDispatcher() + eh) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -156,6 +145,6 @@
})
expect(3)
yield() // run coroutine
- finish(5)
+ finish(6)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
index 475ee57..ba14b89 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
@@ -16,7 +16,7 @@
val job = launch {
expect(3)
}
- val completable = job.asCompletable(coroutineContext)
+ val completable = job.asCompletable(coroutineContext.minusKey(Job))
completable.subscribe {
expect(4)
}
@@ -32,7 +32,7 @@
expect(3)
throw RuntimeException("OK")
}
- val completable = job.asCompletable(coroutineContext)
+ val completable = job.asCompletable(coroutineContext.minusKey(Job))
completable.subscribe {
expect(4)
}
@@ -140,7 +140,7 @@
throw TestException("K")
}
val observable = c.asObservable(Dispatchers.Unconfined)
- val single = GlobalScope.rxSingle(Dispatchers.Unconfined) {
+ val single = rxSingle(Dispatchers.Unconfined) {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
index 543de09..aebf999 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
@@ -15,7 +15,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val observable = rxFlowable {
+ val observable = rxFlowable(currentDispatcher()) {
expect(4)
send("OK")
}
@@ -32,7 +32,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val observable = rxFlowable<String>(NonCancellable) {
+ val observable = rxFlowable<String>(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -52,7 +52,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val observable = rxFlowable<String> {
+ val observable = rxFlowable<String>(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -72,23 +72,10 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- rxFlowable<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testNotifyOnceOnCancellation() = runTest {
expect(1)
val observable =
- rxFlowable {
+ rxFlowable(currentDispatcher()) {
expect(5)
send("OK")
try {
@@ -124,7 +111,7 @@
@Test
fun testFailingConsumer() = runTest {
- val pub = rxFlowable {
+ val pub = rxFlowable(currentDispatcher()) {
repeat(3) {
expect(it + 1) // expect(1), expect(2) *should* be invoked
send(it)
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index 9b55e58..ca7c0ca 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -20,7 +20,7 @@
) : TestBase() {
enum class Ctx {
- MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context },
+ MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };
@@ -58,7 +58,7 @@
@Test
fun testSingle() = runBlocking {
- val observable = CoroutineScope(ctx(coroutineContext)).rxObservable {
+ val observable = rxObservable(ctx(coroutineContext)) {
if (delay) delay(1)
send("OK")
}
@@ -101,8 +101,7 @@
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
rxObservable<String> {
- yield()
- expectUnreached()
+ hang { }
}.awaitFirst()
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
index e97b1f0..5a9bac2 100644
--- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
@@ -24,7 +24,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val maybe = rxMaybe {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
"OK"
}
@@ -41,7 +41,7 @@
@Test
fun testBasicEmpty() = runBlocking {
expect(1)
- val maybe = rxMaybe {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
null
}
@@ -57,7 +57,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val maybe = rxMaybe(NonCancellable) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -78,7 +78,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val maybe = rxMaybe {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -100,7 +100,7 @@
@Test
fun testMaybeNoWait() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
"OK"
}
@@ -121,7 +121,7 @@
@Test
fun testMaybeEmitAndAwait() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
Maybe.just("O").await() + "K"
}
@@ -132,7 +132,7 @@
@Test
fun testMaybeWithDelay() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
@@ -143,7 +143,7 @@
@Test
fun testMaybeException() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
Observable.just("O", "K").awaitSingle() + "K"
}
@@ -154,7 +154,7 @@
@Test
fun testAwaitFirst() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
Observable.just("O", "#").awaitFirst() + "K"
}
@@ -165,7 +165,7 @@
@Test
fun testAwaitLast() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
Observable.just("#", "O").awaitLast() + "K"
}
@@ -176,7 +176,7 @@
@Test
fun testExceptionFromObservable() {
- val maybe = GlobalScope.rxMaybe {
+ val maybe = rxMaybe {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -191,7 +191,7 @@
@Test
fun testExceptionFromCoroutine() {
- val maybe = GlobalScope.rxMaybe<String> {
+ val maybe = rxMaybe<String> {
throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
}
@@ -202,22 +202,9 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- rxMaybe<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testCancelledConsumer() = runTest {
expect(1)
- val maybe = rxMaybe<Int> {
+ val maybe = rxMaybe<Int>(currentDispatcher()) {
expect(4)
try {
delay(Long.MAX_VALUE)
@@ -242,7 +229,7 @@
@Test
fun testSuppressedException() = runTest {
- val maybe = rxMaybe(NonCancellable) {
+ val maybe = rxMaybe(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
@@ -261,12 +248,14 @@
}
@Test
- fun testUnhandledException() = runTest(
- unhandled = listOf { it -> it is TestException }
- ) {
+ fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
- val maybe = rxMaybe(NonCancellable) {
+ val eh = CoroutineExceptionHandler { _, t ->
+ assertTrue(t is TestException)
+ expect(5)
+ }
+ val maybe = rxMaybe(currentDispatcher() + eh) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -286,6 +275,6 @@
})
expect(3)
yield() // run coroutine
- finish(5)
+ finish(6)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
index 9208195..75f79de 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
@@ -76,7 +76,7 @@
send("O")
throw IOException("K")
}
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
index 8dc7120..c71ef56 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
@@ -14,7 +14,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val observable = rxObservable {
+ val observable = rxObservable(currentDispatcher()) {
expect(4)
send("OK")
}
@@ -31,7 +31,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val observable = rxObservable<String>(NonCancellable) {
+ val observable = rxObservable<String>(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -51,7 +51,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val observable = rxObservable<String> {
+ val observable = rxObservable<String>(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -71,23 +71,10 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- rxObservable<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testNotifyOnceOnCancellation() = runTest {
expect(1)
val observable =
- rxObservable {
+ rxObservable(currentDispatcher()) {
expect(5)
send("OK")
try {
@@ -124,7 +111,7 @@
@Test
fun testFailingConsumer() = runTest {
expect(1)
- val pub = rxObservable {
+ val pub = rxObservable(currentDispatcher()) {
expect(2)
send("OK")
try {
diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
index 2ae9570..8b78616 100644
--- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
@@ -21,14 +21,14 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val single = rxSingle {
+ val single = rxSingle(currentDispatcher()) {
expect(4)
"OK"
}
expect(2)
single.subscribe { value ->
expect(5)
- Assert.assertThat(value, IsEqual("OK"))
+ assertThat(value, IsEqual("OK"))
}
expect(3)
yield() // to started coroutine
@@ -38,7 +38,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val single = rxSingle(NonCancellable) {
+ val single = rxSingle(currentDispatcher()) {
expect(4)
throw RuntimeException("OK")
}
@@ -47,8 +47,8 @@
expectUnreached()
}, { error ->
expect(5)
- Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
- Assert.assertThat(error.message, IsEqual("OK"))
+ assertThat(error, IsInstanceOf(RuntimeException::class.java))
+ assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to started coroutine
@@ -59,7 +59,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val single = rxSingle {
+ val single = rxSingle(currentDispatcher()) {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -82,7 +82,7 @@
@Test
fun testSingleNoWait() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
"OK"
}
@@ -98,7 +98,7 @@
@Test
fun testSingleEmitAndAwait() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
Single.just("O").await() + "K"
}
@@ -109,7 +109,7 @@
@Test
fun testSingleWithDelay() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
@@ -120,7 +120,7 @@
@Test
fun testSingleException() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
Observable.just("O", "K").awaitSingle() + "K"
}
@@ -131,7 +131,7 @@
@Test
fun testAwaitFirst() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
Observable.just("O", "#").awaitFirst() + "K"
}
@@ -142,7 +142,7 @@
@Test
fun testAwaitLast() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
Observable.just("#", "O").awaitLast() + "K"
}
@@ -153,7 +153,7 @@
@Test
fun testExceptionFromObservable() {
- val single = GlobalScope.rxSingle {
+ val single = rxSingle {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -168,7 +168,7 @@
@Test
fun testExceptionFromCoroutine() {
- val single = GlobalScope.rxSingle<String> {
+ val single = rxSingle<String> {
throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
}
@@ -179,21 +179,8 @@
}
@Test
- fun testCancelsParentOnFailure() = runTest(
- expected = { it is RuntimeException && it.message == "OK" }
- ) {
- // has parent, so should cancel it on failure
- rxSingle<Unit> {
- throw RuntimeException("OK")
- }.subscribe(
- { expectUnreached() },
- { assert(it is RuntimeException) }
- )
- }
-
- @Test
fun testSuppressedException() = runTest {
- val single = rxSingle(NonCancellable) {
+ val single = rxSingle(currentDispatcher()) {
launch(start = CoroutineStart.ATOMIC) {
throw TestException() // child coroutine fails
}
@@ -212,12 +199,14 @@
}
@Test
- fun testUnhandledException() = runTest(
- unhandled = listOf { it -> it is TestException }
- ) {
+ fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
- val single = rxSingle(NonCancellable) {
+ val eh = CoroutineExceptionHandler { _, t ->
+ assertTrue(t is TestException)
+ expect(5)
+ }
+ val single = rxSingle(currentDispatcher() + eh) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -236,6 +225,6 @@
})
expect(3)
yield() // run coroutine
- finish(5)
+ finish(6)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
index 990d90a..b87849a 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
@@ -10,7 +10,7 @@
import kotlinx.coroutines.reactive.*
import kotlin.coroutines.CoroutineContext
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
index edaba3d..1a214ce 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
@@ -11,7 +11,7 @@
import io.reactivex.schedulers.Schedulers
import kotlin.coroutines.CoroutineContext
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
index 6fd997f..5f07ba4 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
@@ -14,7 +14,7 @@
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
-) = GlobalScope.publish<R>(context) {
+) = publish<R>(context) {
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
@@ -27,6 +27,6 @@
fun main() = runBlocking<Unit> {
range(1, 5)
- .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
+ .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // print all the resulting strings
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
index b304193..818a792 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
@@ -12,7 +12,7 @@
import org.reactivestreams.*
import kotlin.coroutines.*
-fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
@@ -35,5 +35,5 @@
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
- slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it
+ slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
index c57e78f..12d9c1f 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
@@ -10,7 +10,7 @@
import org.reactivestreams.*
import kotlin.coroutines.*
-fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
@@ -33,5 +33,5 @@
}
fun main() = runBlocking<Unit> {
- testPub().merge(coroutineContext).collect { println(it) } // print the whole stream
+ testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
}