Structured concurrency for reactive modules
* Deprecated API older than 6 months removed
* Standalone builders are deprecated
* Guide and readme updated
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index b490de9..eb14e80 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -96,7 +96,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
// create a channel that produces numbers from 1 to 3 with 200ms delays between them
- val source = produce<Int>(coroutineContext) {
+ val source = produce<Int> {
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
@@ -153,7 +153,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
// create a publisher that produces numbers from 1 to 3 with 200ms delays between them
- val source = publish<Int>(coroutineContext) {
+ val source = publish<Int> {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
@@ -343,7 +343,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
// coroutine -- fast producer of elements in the context of the main thread
- val source = rxFlowable(coroutineContext) {
+ val source = rxFlowable {
for (x in 1..3) {
send(x) // this is a suspending function
println("Sent $x") // print after successfully sent item
@@ -476,7 +476,7 @@
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print the most recent update
- launch(coroutineContext) { // use the context of the main thread for a coroutine
+ launch { // use the context of the main thread for a coroutine
subject.consumeEach { println(it) }
}
subject.onNext("three")
@@ -512,7 +512,7 @@
broadcast.offer("one")
broadcast.offer("two")
// now launch a coroutine to print the most recent update
- launch(coroutineContext) { // use the context of the main thread for a coroutine
+ launch { // use the context of the main thread for a coroutine
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
@@ -569,12 +569,12 @@
-->
```kotlin
-fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
```
-In this code `CoroutineContext` is used instead of an `Executor` and all the backpressure aspects are taken care
+In this code `CoroutineScope` and `context` are used instead of an `Executor` and all the backpressure aspects are taken care
of by the coroutines machinery. Note, that this implementation depends only on the small reactive streams library
that defines `Publisher` interface and its friends.
@@ -582,7 +582,8 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- range(CommonPool, 1, 5).consumeEach { println(it) }
+ // Range inherits parent job from runBlocking, but overrides dispatcher with DefaultDispatcher
+ range(DefaultDispatcher, 1, 5).consumeEach { println(it) }
}
```
@@ -620,7 +621,7 @@
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
-) = publish<R>(context) {
+) = GlobalScope.publish<R>(context) {
consumeEach { // consume the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
@@ -633,14 +634,14 @@
<!--- INCLUDE
-fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
for (x in start until start + count) send(x)
}
-->
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- range(coroutineContext, 1, 5)
+ range(1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.consumeEach { println(it) } // print all the resulting strings
}
@@ -676,7 +677,7 @@
-->
```kotlin
-fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
@@ -701,7 +702,7 @@
(its pure-Rx implementation is shown in later sections):
```kotlin
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -713,8 +714,8 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- val slowNums = rangeWithInterval(coroutineContext, 200, 1, 10) // numbers with 200ms interval
- val stop = rangeWithInterval(coroutineContext, 500, 1, 10) // the first one after 500ms
+ val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
}
```
@@ -746,9 +747,9 @@
-->
```kotlin
-fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
consumeEach { pub -> // for each publisher received on the source channel
- launch(coroutineContext) { // launch a child coroutine
+ launch { // launch a child coroutine
pub.consumeEach { send(it) } // resend all element from this publisher
}
}
@@ -769,7 +770,7 @@
<!--- INCLUDE
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -778,10 +779,10 @@
-->
```kotlin
-fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
- send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
+fun CoroutineScope.testPub() = publish<Publisher<Int>> {
+ send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
delay(100) // wait for 100 ms
- send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
+ send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
delay(1100) // wait for 1.1s - done in 1.2 sec after start
}
```
@@ -790,7 +791,7 @@
```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
- testPub(coroutineContext).merge(coroutineContext).consumeEach { println(it) } // print the whole stream
+ testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream
}
```
@@ -872,7 +873,7 @@
-->
```kotlin
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -922,7 +923,7 @@
-->
```kotlin
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -1075,12 +1076,12 @@
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/while-select.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
-[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
+[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/kotlinx.coroutines.experimental.-coroutine-scope/publish.html
[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/consume-each.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open-subscription.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
-[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-flowable.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-reactive/README.md b/reactive/kotlinx-coroutines-reactive/README.md
index 7bfea2f..c645b4c 100644
--- a/reactive/kotlinx-coroutines-reactive/README.md
+++ b/reactive/kotlinx-coroutines-reactive/README.md
@@ -19,7 +19,6 @@
| [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher
| [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher
| [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription] | Subscribes to publisher and returns [ReceiveChannel]
-| [Publisher.iterator][org.reactivestreams.Publisher.iterator] | Subscribes to publisher and returns [ChannelIterator]
Conversion functions:
@@ -32,17 +31,15 @@
<!--- INDEX kotlinx.coroutines.experimental.channels -->
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
-[ChannelIterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel-iterator/index.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.experimental.reactive -->
-[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/publish.html
+[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/kotlinx.coroutines.experimental.-coroutine-scope/publish.html
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first.html
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-default.html
[org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-else.html
[org.reactivestreams.Publisher.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-first-or-null.html
[org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/await-single.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/open-subscription.html
-[org.reactivestreams.Publisher.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/org.reactivestreams.-publisher/iterator.html
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.experimental.reactive/kotlinx.coroutines.experimental.channels.-receive-channel/as-publisher.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
index a39f597..a3fe5f1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
@@ -28,27 +28,6 @@
openSubscription(request) as SubscriptionReceiveChannel<T>
/**
- * @suppress **Deprecated**: Renamed to [openSubscription]
- */
-@Deprecated(message = "Renamed to `openSubscription`",
- replaceWith = ReplaceWith("openSubscription()"))
-public fun <T> Publisher<T>.open(): SubscriptionReceiveChannel<T> =
- openSubscription() as SubscriptionReceiveChannel<T>
-
-/**
- * Subscribes to this [Publisher] and returns an iterator to receive elements emitted by it.
- *
- * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
- * unsubscribe from the observable.
- */
-@Suppress("DeprecatedCallableAddReplaceWith")
-@Deprecated(message =
- "This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
- "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
- "Use `source.consumeEach { x -> ... }`.")
-public operator fun <T> Publisher<T>.iterator() = openSubscription().iterator()
-
-/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
*/
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) {
diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt
index 63649f2..09fd37a 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt
@@ -4,10 +4,10 @@
package kotlinx.coroutines.experimental.reactive
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import org.reactivestreams.Publisher
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import org.reactivestreams.*
+import kotlin.coroutines.experimental.*
/**
* Converts a stream of elements received from the channel to the hot reactive publisher.
@@ -17,14 +17,7 @@
*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
-public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher<T> = publish(context) {
+public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher<T> = GlobalScope.publish(context) {
for (t in this@asPublisher)
send(t)
}
-
-/**
- * @suppress **Deprecated**: Renamed to [asPublisher]
- */
-@Deprecated(message = "Renamed to `asPublisher`",
- replaceWith = ReplaceWith("asPublisher(context)"))
-public fun <T> ReceiveChannel<T>.toPublisher(context: CoroutineContext): Publisher<T> = asPublisher(context)
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 66c6675..e45f021 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -26,37 +26,38 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
-
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> publish(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.publish(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.publish] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.publish(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.reactive.publish"])
+)
public fun <T> publish(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> =
- publish(context, block = block)
+): Publisher<T> = GlobalScope.publish(context + (parent ?: EmptyCoroutineContext), block)
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
index aa2ac03..34b8ea5 100644
--- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
@@ -15,8 +15,8 @@
@RunWith(Parameterized::class)
class IntegrationTest(
- val ctx: Ctx,
- val delay: Boolean
+ private val ctx: Ctx,
+ private val delay: Boolean
) : TestBase() {
enum class Ctx {
@@ -39,7 +39,7 @@
@Test
fun testEmpty(): Unit = runBlocking {
- val pub = publish<String>(ctx(coroutineContext)) {
+ val pub = CoroutineScope(ctx(coroutineContext)).publish<String> {
if (delay) delay(1)
// does not send anything
}
@@ -55,8 +55,8 @@
}
@Test
- fun testSingle() = runBlocking<Unit> {
- val pub = publish<String>(ctx(coroutineContext)) {
+ fun testSingle() = runBlocking {
+ val pub = publish(ctx(coroutineContext)) {
if (delay) delay(1)
send("OK")
}
@@ -77,7 +77,7 @@
@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
- val pub = publish<Int>(ctx(coroutineContext)) {
+ val pub = CoroutineScope(ctx(coroutineContext)).publish {
for (i in 1..n) {
send(i)
if (delay) delay(1)
@@ -112,7 +112,7 @@
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
expect(1)
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
- publish<String>(coroutineContext) {
+ publish<String> {
yield()
expect(2)
// Nothing to emit
@@ -131,7 +131,7 @@
assertThat(last, IsEqual(n))
}
- inline fun assertIAE(block: () -> Unit) {
+ private inline fun assertIAE(block: () -> Unit) {
try {
block()
expectUnreached()
@@ -140,7 +140,7 @@
}
}
- inline fun assertNSE(block: () -> Unit) {
+ private inline fun assertNSE(block: () -> Unit) {
try {
block()
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
index bd5db8e..2bf197d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
@@ -9,13 +9,12 @@
import org.junit.*
import org.junit.Assert.*
import org.reactivestreams.*
-import kotlin.coroutines.experimental.*
class PublishTest : TestBase() {
@Test
- fun testBasicEmpty() = runBlocking<Unit> {
+ fun testBasicEmpty() = runBlocking {
expect(1)
- val publisher = publish<Int>(coroutineContext) {
+ val publisher = publish<Int> {
expect(5)
}
expect(2)
@@ -31,9 +30,9 @@
}
@Test
- fun testBasicSingle() = runBlocking<Unit> {
+ fun testBasicSingle() = runBlocking {
expect(1)
- val publisher = publish<Int>(coroutineContext) {
+ val publisher = publish {
expect(5)
send(42)
expect(7)
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
index 5d5d2f8..c966fe8 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt
@@ -7,11 +7,10 @@
import kotlinx.coroutines.experimental.*
import org.junit.*
import org.reactivestreams.*
-import kotlin.coroutines.experimental.*
class PublisherBackpressureTest : TestBase() {
@Test
- fun testCancelWhileBPSuspended() = runBlocking<Unit> {
+ fun testCancelWhileBPSuspended() = runBlocking {
expect(1)
val observable = publish(coroutineContext) {
expect(5)
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt
index 8bab2d3..7dbf0a3 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherCompletionStressTest.kt
@@ -4,18 +4,15 @@
package kotlinx.coroutines.experimental.reactive
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.TestBase
-import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.withTimeout
-import org.junit.Test
+import kotlinx.coroutines.experimental.*
+import org.junit.*
import java.util.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
class PublisherCompletionStressTest : TestBase() {
- val N_REPEATS = 10_000 * stressTestMultiplier
+ private val N_REPEATS = 10_000 * stressTestMultiplier
- fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+ private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish(context) {
for (x in start until start + count) send(x)
}
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
index 5b324ab..2dcb84c 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt
@@ -9,9 +9,6 @@
import org.junit.*
import org.junit.Assert.*
-/**
- * Test emitting multiple values with [publish].
- */
class PublisherMultiTest : TestBase() {
@Test
fun testConcurrentStress() = runBlocking {
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
index a111a73..93ed7fa 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt
@@ -10,10 +10,9 @@
import org.junit.Assert.*
import org.junit.runner.*
import org.junit.runners.*
-import kotlin.coroutines.experimental.*
@RunWith(Parameterized::class)
-class PublisherSubscriptionSelectTest(val request: Int) : TestBase() {
+class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() {
companion object {
@Parameterized.Parameters(name = "request = {0}")
@JvmStatic
@@ -24,7 +23,7 @@
fun testSelect() = runTest {
// source with n ints
val n = 1000 * stressTestMultiplier
- val source = publish(coroutineContext) { repeat(n) { send(it) } }
+ val source = publish { repeat(n) { send(it) } }
var a = 0
var b = 0
// open two subs
diff --git a/reactive/kotlinx-coroutines-reactor/README.md b/reactive/kotlinx-coroutines-reactor/README.md
index 494b0fc..5a0e31a 100644
--- a/reactive/kotlinx-coroutines-reactor/README.md
+++ b/reactive/kotlinx-coroutines-reactor/README.md
@@ -29,8 +29,8 @@
<!--- INDEX kotlinx.coroutines.experimental.channels -->
<!--- MODULE kotlinx-coroutines-reactor -->
<!--- INDEX kotlinx.coroutines.experimental.reactor -->
-[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/mono.html
-[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/flux.html
+[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-coroutine-scope/mono.html
+[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-coroutine-scope/flux.html
[kotlinx.coroutines.experimental.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-job/as-mono.html
[kotlinx.coroutines.experimental.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.-deferred/as-mono.html
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.experimental.reactor/kotlinx.coroutines.experimental.channels.-receive-channel/as-flux.html
diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt
index 940a621..03366d0 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt
@@ -4,13 +4,10 @@
package kotlinx.coroutines.experimental.reactor
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.Deferred
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import reactor.core.publisher.*
+import kotlin.coroutines.experimental.*
/**
* Converts this job to the hot reactive mono that signals
@@ -21,7 +18,7 @@
*
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
-public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = mono(context) { this@asMono.join() }
+public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = GlobalScope.mono(context) { this@asMono.join() }
/**
* Converts this deferred value to the hot reactive mono that signals
@@ -32,7 +29,7 @@
*
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
-public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = mono(context) { this@asMono.await() }
+public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = GlobalScope.mono(context) { this@asMono.await() }
/**
* Converts a stream of elements received from the channel to the hot reactive flux.
@@ -42,7 +39,7 @@
*
* @param context -- the coroutine context from which the resulting flux is going to be signalled
*/
-public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = flux(context) {
+public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = GlobalScope.flux(context) {
for (t in this@asFlux)
send(t)
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index cde3709..a21939f 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -4,17 +4,22 @@
package kotlinx.coroutines.experimental.reactor
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.channels.ProducerScope
-import kotlinx.coroutines.experimental.reactive.publish
-import reactor.core.publisher.Flux
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.reactive.*
+import reactor.core.publisher.*
+import kotlin.coroutines.experimental.*
/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
+ *
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
@@ -24,8 +29,23 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*/
+fun <T> CoroutineScope.flux(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend ProducerScope<T>.() -> Unit
+): Flux<T> = Flux.from(publish(newCoroutineContext(context), block = block))
+
+
+/**
+ * Creates cold reactive [Flux] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.mono] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.flux(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.reactor.flux"])
+)
@JvmOverloads // for binary compatibility with older code compiled before context had a default
fun <T> flux(
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
-): Flux<T> = Flux.from(publish(context, block = block))
+): Flux<T> = GlobalScope.flux(context, block)
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index e77eaae..5cc0742 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -19,37 +19,38 @@
* | Returns a null | `success`
* | Failure with exception or unsubscribe | `error`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine..
* @param block the coroutine code.
*/
-fun <T> mono(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+fun <T> CoroutineScope.mono(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [mono][Mono] that will run a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.mono] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.mono(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.reactor.mono"])
+)
fun <T> mono(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> T?
-): Mono<T> =
- mono(context, block = block)
+): Mono<T> = GlobalScope.mono(context + (parent ?: EmptyCoroutineContext), block)
private class MonoCoroutine<in T>(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
index 85a2b2f..ee80cb9 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt
@@ -112,7 +112,7 @@
throw TestException("K")
}
val flux = c.asFlux(Unconfined)
- val mono = mono(Unconfined) {
+ val mono = GlobalScope.mono(Unconfined) {
var result = ""
try {
flux.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt
index 19c4f4f..1c9bf2e 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxCompletionStressTest.kt
@@ -4,19 +4,16 @@
package kotlinx.coroutines.experimental.reactor
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.TestBase
-import kotlinx.coroutines.experimental.reactive.consumeEach
-import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.withTimeout
-import org.junit.Test
-import java.util.Random
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.reactive.*
+import org.junit.*
+import java.util.*
+import kotlin.coroutines.experimental.*
class FluxCompletionStressTest : TestBase() {
- val N_REPEATS = 10_000 * stressTestMultiplier
+ private val N_REPEATS = 10_000 * stressTestMultiplier
- fun range(context: CoroutineContext, start: Int, count: Int) = flux(context) {
+ private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flux(context) {
for (x in start until start + count) send(x)
}
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
index 4581c61..6f4411b 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
@@ -11,14 +11,11 @@
import reactor.core.publisher.*
import java.io.*
-/**
- * Test emitting multiple values with [flux].
- */
class FluxMultiTest : TestBase() {
@Test
fun testNumbers() {
val n = 100 * stressTestMultiplier
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
repeat(n) { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -29,7 +26,7 @@
@Test
fun testConcurrentStress() {
val n = 10_000 * stressTestMultiplier
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
@@ -48,7 +45,7 @@
@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
- val flux = flux(Unconfined) {
+ val flux = GlobalScope.flux(Unconfined) {
Flux.range(0, n).consumeEach { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -59,7 +56,7 @@
@Test
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
Flux.range(0, n).consumeEach { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
@@ -69,11 +66,11 @@
@Test
fun testSendAndCrash() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send("O")
throw IOException("K")
}
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
var result = ""
try {
flux.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
index c284f48..deb5271 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
@@ -4,22 +4,17 @@
package kotlinx.coroutines.experimental.reactor
-import kotlinx.coroutines.experimental.DefaultDispatcher
+import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.*
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Assert.assertEquals
-import org.junit.Assert.fail
-import org.junit.Test
-import reactor.core.publisher.Flux
-import java.time.Duration.ofMillis
+import org.junit.*
+import org.junit.Assert.*
+import reactor.core.publisher.*
+import java.time.Duration.*
-/**
- * Tests emitting single item with [flux].
- */
class FluxSingleTest {
@Test
fun testSingleNoWait() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send("OK")
}
@@ -35,7 +30,7 @@
@Test
fun testSingleEmitAndAwait() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O").awaitSingle() + "K")
}
@@ -46,7 +41,7 @@
@Test
fun testSingleWithDelay() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
}
@@ -57,7 +52,7 @@
@Test
fun testSingleException() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O", "K").awaitSingle() + "K")
}
@@ -68,7 +63,7 @@
@Test
fun testAwaitFirst() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O", "#").awaitFirst() + "K")
}
@@ -79,7 +74,7 @@
@Test
fun testAwaitFirstOrDefault() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.empty<String>().awaitFirstOrDefault("O") + "K")
}
@@ -90,7 +85,7 @@
@Test
fun testAwaitFirstOrDefaultWithValues() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K")
}
@@ -101,7 +96,7 @@
@Test
fun testAwaitFirstOrNull() {
- val flux = flux<String>(DefaultDispatcher) {
+ val flux = GlobalScope.flux<String> {
send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -112,7 +107,7 @@
@Test
fun testAwaitFirstOrNullWithValues() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
}
@@ -123,7 +118,7 @@
@Test
fun testAwaitFirstOrElse() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
}
@@ -134,7 +129,7 @@
@Test
fun testAwaitFirstOrElseWithValues() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
}
@@ -145,7 +140,7 @@
@Test
fun testAwaitLast() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
send(Flux.just("#", "O").awaitLast() + "K")
}
@@ -156,7 +151,7 @@
@Test
fun testExceptionFromObservable() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
try {
send(Flux.error<String>(RuntimeException("O")).awaitFirst())
} catch (e: RuntimeException) {
@@ -171,7 +166,7 @@
@Test
fun testExceptionFromCoroutine() {
- val flux = flux<String>(DefaultDispatcher) {
+ val flux = GlobalScope.flux<String> {
error(Flux.just("O").awaitSingle() + "K")
}
@@ -183,7 +178,7 @@
@Test
fun testFluxIteration() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
var result = ""
Flux.just("O", "K").consumeEach { result += it }
send(result)
@@ -196,7 +191,7 @@
@Test
fun testFluxIterationFailure() {
- val flux = flux(DefaultDispatcher) {
+ val flux = GlobalScope.flux {
try {
Flux.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
index b99d151..7ebd80d 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
@@ -7,13 +7,12 @@
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.*
import org.junit.*
-import kotlin.coroutines.experimental.*
class FluxTest : TestBase() {
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val flux = flux(coroutineContext) {
+ val flux = flux {
expect(4)
send("OK")
}
@@ -30,7 +29,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val flux = flux<String>(coroutineContext) {
+ val flux = flux<String> {
expect(4)
throw RuntimeException("OK")
}
@@ -50,7 +49,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val flux = flux<String>(coroutineContext) {
+ val flux = flux<String> {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
index 048c6bd..e840a32 100644
--- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
@@ -11,11 +11,7 @@
import org.junit.Assert.*
import reactor.core.publisher.*
import java.time.Duration.*
-import kotlin.coroutines.experimental.*
-/**
- * Tests emitting single item with [mono].
- */
class MonoTest : TestBase() {
@Before
fun setup() {
@@ -25,7 +21,7 @@
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
- val mono = mono(coroutineContext) {
+ val mono = mono {
expect(4)
"OK"
}
@@ -42,7 +38,7 @@
@Test
fun testBasicFailure() = runBlocking {
expect(1)
- val mono = mono(coroutineContext) {
+ val mono = mono {
expect(4)
throw RuntimeException("OK")
}
@@ -62,7 +58,7 @@
@Test
fun testBasicEmpty() = runBlocking {
expect(1)
- val mono = mono(coroutineContext) {
+ val mono = mono {
expect(4)
null
}
@@ -78,7 +74,7 @@
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val mono = mono(coroutineContext) {
+ val mono = mono {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -100,7 +96,7 @@
@Test
fun testMonoNoWait() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
"OK"
}
@@ -116,7 +112,7 @@
@Test
fun testMonoEmitAndAwait() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
Mono.just("O").awaitSingle() + "K"
}
@@ -127,7 +123,7 @@
@Test
fun testMonoWithDelay() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
}
@@ -138,7 +134,7 @@
@Test
fun testMonoException() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
Flux.just("O", "K").awaitSingle() + "K"
}
@@ -149,7 +145,7 @@
@Test
fun testAwaitFirst() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
Flux.just("O", "#").awaitFirst() + "K"
}
@@ -160,7 +156,7 @@
@Test
fun testAwaitLast() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
Flux.just("#", "O").awaitLast() + "K"
}
@@ -171,7 +167,7 @@
@Test
fun testExceptionFromFlux() {
- val mono = mono(DefaultDispatcher) {
+ val mono = GlobalScope.mono {
try {
Flux.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -186,7 +182,7 @@
@Test
fun testExceptionFromCoroutine() {
- val mono = mono<String>(DefaultDispatcher) {
+ val mono = GlobalScope.mono<String> {
throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
}
diff --git a/reactive/kotlinx-coroutines-rx1/README.md b/reactive/kotlinx-coroutines-rx1/README.md
index bd85189..b4d259b 100644
--- a/reactive/kotlinx-coroutines-rx1/README.md
+++ b/reactive/kotlinx-coroutines-rx1/README.md
@@ -23,8 +23,7 @@
| [Observable.awaitLast][rx.Observable.awaitFirst] | Returns the last value from the given observable
| [Observable.awaitSingle][rx.Observable.awaitSingle] | Returns the single value from the given observable
| [Observable.openSubscription][rx.Observable.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
-| [Observable.iterator][rx.Observable.iterator] | Subscribes to observable and returns [ChannelIterator]
-
+./gradlew knit
Conversion functions:
| **Name** | **Description**
@@ -39,12 +38,11 @@
<!--- INDEX kotlinx.coroutines.experimental.channels -->
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
-[ChannelIterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel-iterator/index.html
<!--- MODULE kotlinx-coroutines-rx1 -->
<!--- INDEX kotlinx.coroutines.experimental.rx1 -->
-[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-completable.html
-[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-single.html
-[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx-observable.html
+[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.-coroutine-scope/rx-completable.html
+[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.-coroutine-scope/rx-single.html
+[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.-coroutine-scope/rx-observable.html
[rx.Completable.awaitCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-completable/await-completed.html
[rx.Single.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-single/await.html
[rx.Observable.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first.html
@@ -53,7 +51,6 @@
[rx.Observable.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-first-or-null.html
[rx.Observable.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/await-single.html
[rx.Observable.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/open-subscription.html
-[rx.Observable.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/rx.-observable/iterator.html
[kotlinx.coroutines.experimental.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.-job/as-completable.html
[kotlinx.coroutines.experimental.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.-deferred/as-single.html
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx1/kotlinx.coroutines.experimental.rx1/kotlinx.coroutines.experimental.channels.-receive-channel/as-observable.html
diff --git a/reactive/kotlinx-coroutines-rx1/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx1/src/RxChannel.kt
index 5d49346..58434db 100644
--- a/reactive/kotlinx-coroutines-rx1/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/RxChannel.kt
@@ -30,27 +30,6 @@
openSubscription(request) as SubscriptionReceiveChannel<T>
/**
- * @suppress **Deprecated**: Renamed to [openSubscription]
- */
-@Deprecated(message = "Renamed to `openSubscription`",
- replaceWith = ReplaceWith("openSubscription()"))
-public fun <T> Observable<T>.open(): SubscriptionReceiveChannel<T> =
- openSubscription() as SubscriptionReceiveChannel<T>
-
-/**
- * Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
- *
- * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
- * unsubscribe from the observable.
- */
-@Suppress("DeprecatedCallableAddReplaceWith")
-@Deprecated(message =
-"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
- "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
- "Use `source.consumeEach { x -> ... }`.")
-public operator fun <T> Observable<T>.iterator() = openSubscription().iterator()
-
-/**
* Subscribes to this [Observable] and performs the specified action for each received element.
*/
public suspend inline fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
@@ -59,13 +38,6 @@
channel.cancel()
}
-/**
- * @suppress: **Deprecated**: binary compatibility with old code
- */
-@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
-public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
- consumeEach { action(it) }
-
private class SubscriptionChannel<T>(
private val request: Int
) : LinkedListChannel<T>(), ReceiveChannel<T>, SubscriptionReceiveChannel<T> {
diff --git a/reactive/kotlinx-coroutines-rx1/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx1/src/RxCompletable.kt
index af62bb2..5e2ee75 100644
--- a/reactive/kotlinx-coroutines-rx1/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/RxCompletable.kt
@@ -18,37 +18,38 @@
* | Completes successfully | `onCompleted`
* | Failure with exception or unsubscribe | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine..
* @param block the coroutine code.
*/
-public fun rxCompletable(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun CoroutineScope.rxCompletable(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxCompletableCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [Completable] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxCompletable] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxCompletable(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx1.rxCompletable"])
+)
public fun rxCompletable(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
-): Completable =
- rxCompletable(context, block = block)
+): Completable = GlobalScope.rxCompletable(context + (parent ?: EmptyCoroutineContext), block)
private class RxCompletableCoroutine(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-rx1/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx1/src/RxConvert.kt
index 013f382..0ee0d7c 100644
--- a/reactive/kotlinx-coroutines-rx1/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/RxConvert.kt
@@ -4,11 +4,10 @@
package kotlinx.coroutines.experimental.rx1
-import kotlinx.coroutines.experimental.Deferred
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
import rx.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
/**
* Converts this job to the hot reactive completable that signals
@@ -19,7 +18,7 @@
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
+public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
this@asCompletable.join()
}
@@ -32,7 +31,7 @@
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-public fun <T> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle<T>(context) {
+public fun <T> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle<T>(context) {
this@asSingle.await()
}
@@ -44,28 +43,7 @@
*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
-public fun <T> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+public fun <T> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
for (t in this@asObservable)
send(t)
}
-
-/**
- * @suppress **Deprecated**: Renamed to [asCompletable]
- */
-@Deprecated(message = "Renamed to `asCompletable`",
- replaceWith = ReplaceWith("asCompletable(context)"))
-public fun Job.toCompletable(context: CoroutineContext): Completable = asCompletable(context)
-
-/**
- * @suppress **Deprecated**: Renamed to [asSingle]
- */
-@Deprecated(message = "Renamed to `asSingle`",
- replaceWith = ReplaceWith("asSingle(context)"))
-public fun <T> Deferred<T>.toSingle(context: CoroutineContext): Single<T> = asSingle(context)
-
-/**
- * @suppress **Deprecated**: Renamed to [asObservable]
- */
-@Deprecated(message = "Renamed to `asObservable`",
- replaceWith = ReplaceWith("asObservable(context)"))
-public fun <T> ReceiveChannel<T>.toObservable(context: CoroutineContext): Observable<T> = asObservable(context)
diff --git a/reactive/kotlinx-coroutines-rx1/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/RxObservable.kt
index 7a19152..2cc646d 100644
--- a/reactive/kotlinx-coroutines-rx1/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/RxObservable.kt
@@ -26,38 +26,39 @@
* | Normal completion or `close` without cause | `onCompleted`
* | Failure with exception or `close` with cause | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
- *
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
+ **
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> rxObservable(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxObservableCoroutine(newContext, subscriber)
subscriber.setProducer(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
subscriber.add(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ *Creates cold [Observable] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxObservable] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxObservable(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx1.rxObservable"])
+)
public fun <T> rxObservable(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend ProducerScope<T>.() -> Unit
-): Observable<T> =
- rxObservable(context, block = block)
+): Observable<T> = GlobalScope.rxObservable(context + (parent ?: EmptyCoroutineContext), block)
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
diff --git a/reactive/kotlinx-coroutines-rx1/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx1/src/RxSingle.kt
index 8bbb54e..88e7202 100644
--- a/reactive/kotlinx-coroutines-rx1/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/RxSingle.kt
@@ -18,37 +18,38 @@
* | Returns a value | `onSuccess`
* | Failure with exception or unsubscribe | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> rxSingle(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.rxSingle(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.add(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [Single] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxSingle] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxSingle(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx1.rxSingle"])
+)
public fun <T> rxSingle(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> T
-): Single<T> =
- rxSingle(context, block = block)
+): Single<T> = GlobalScope.rxSingle(context + (parent ?: EmptyCoroutineContext), block)
private class RxSingleCoroutine<T>(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-rx1/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx1/test/CompletableTest.kt
index 884a492..55a64b8 100644
--- a/reactive/kotlinx-coroutines-rx1/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/CompletableTest.kt
@@ -8,13 +8,12 @@
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
class CompletableTest : TestBase() {
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
}
expect(2)
@@ -27,9 +26,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
throw RuntimeException("OK")
}
@@ -47,9 +46,9 @@
}
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -70,9 +69,9 @@
}
@Test
- fun testAwaitSuccess() = runBlocking<Unit> {
+ fun testAwaitSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(3)
}
expect(2)
@@ -81,9 +80,9 @@
}
@Test
- fun testAwaitFailure() = runBlocking<Unit> {
+ fun testAwaitFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(3)
throw RuntimeException("OK")
}
diff --git a/reactive/kotlinx-coroutines-rx1/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx1/test/ConvertTest.kt
index 44f983f..be8b3b4 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ConvertTest.kt
@@ -14,9 +14,9 @@
class TestException(s: String): RuntimeException(s)
@Test
- fun testToCompletableSuccess() = runBlocking<Unit> {
+ fun testToCompletableSuccess() = runBlocking {
expect(1)
- val job = launch(coroutineContext) {
+ val job = launch {
expect(3)
}
val completable = job.asCompletable(coroutineContext)
@@ -29,9 +29,9 @@
}
@Test
- fun testToCompletableFail() = runBlocking<Unit> {
+ fun testToCompletableFail() = runBlocking {
expect(1)
- val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
+ val job = async(NonCancellable) { // don't kill parent on exception
expect(3)
throw RuntimeException("OK")
}
@@ -80,7 +80,7 @@
@Test
fun testToObservable() {
- val c = GlobalScope.produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
@@ -94,14 +94,14 @@
@Test
fun testToObservableFail() {
- val c = GlobalScope.produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
throw TestException("K")
}
val observable = c.asObservable(Unconfined)
- val single = rxSingle(Unconfined) {
+ val single = GlobalScope.rxSingle(Unconfined) {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx1/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx1/test/IntegrationTest.kt
index 45a2a9d..cb1554e 100644
--- a/reactive/kotlinx-coroutines-rx1/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/IntegrationTest.kt
@@ -15,8 +15,8 @@
@RunWith(Parameterized::class)
class IntegrationTest(
- val ctx: Ctx,
- val delay: Boolean
+ private val ctx: Ctx,
+ private val delay: Boolean
) : TestBase() {
enum class Ctx {
@@ -57,8 +57,8 @@
}
@Test
- fun testSingle() = runBlocking<Unit> {
- val observable = rxObservable<String>(ctx(coroutineContext)) {
+ fun testSingle() = runBlocking {
+ val observable = CoroutineScope(ctx(coroutineContext)).rxObservable {
if (delay) delay(1)
send("OK")
}
@@ -77,7 +77,7 @@
}
@Test
- fun testObservableWithNull() = runBlocking<Unit> {
+ fun testObservableWithNull() = runBlocking {
val observable = rxObservable<String?>(ctx(coroutineContext)) {
if (delay) delay(1)
send(null)
@@ -99,7 +99,7 @@
@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
- val observable = rxObservable<Int>(ctx(coroutineContext)) {
+ val observable = CoroutineScope(ctx(coroutineContext)).rxObservable {
for (i in 1..n) {
send(i)
if (delay) delay(1)
@@ -120,7 +120,7 @@
@Test
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
- rxObservable<String>(coroutineContext) {
+ rxObservable<String> {
yield()
expectUnreached()
}.awaitFirst()
@@ -134,7 +134,7 @@
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
expect(1)
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
- rxObservable<String>(coroutineContext) {
+ rxObservable<String> {
yield()
expect(2)
// Nothing to emit
@@ -153,7 +153,7 @@
assertThat(last, IsEqual(n))
}
- inline fun assertIAE(block: () -> Unit) {
+ private inline fun assertIAE(block: () -> Unit) {
try {
block()
expectUnreached()
@@ -162,7 +162,7 @@
}
}
- inline fun assertNSE(block: () -> Unit) {
+ private inline fun assertNSE(block: () -> Unit) {
try {
block()
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableBackpressureTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableBackpressureTest.kt
index d26900f..d3dca5a 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableBackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableBackpressureTest.kt
@@ -7,13 +7,12 @@
import kotlinx.coroutines.experimental.*
import org.junit.*
import rx.*
-import kotlin.coroutines.experimental.*
class ObservableBackpressureTest : TestBase() {
@Test
- fun testCancelWhileBPSuspended() = runBlocking<Unit> {
+ fun testCancelWhileBPSuspended() = runBlocking {
expect(1)
- val observable = rxObservable(coroutineContext) {
+ val observable = rxObservable {
expect(5)
send("A") // will not suspend, because an item was requested
expect(7)
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableCompletionStressTest.kt
index 8a16fde..64dfac7 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableCompletionStressTest.kt
@@ -4,18 +4,15 @@
package kotlinx.coroutines.experimental.rx1
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.TestBase
-import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.withTimeout
-import org.junit.Test
+import kotlinx.coroutines.experimental.*
+import org.junit.*
import java.util.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
class ObservableCompletionStressTest : TestBase() {
- val N_REPEATS = 10_000 * stressTestMultiplier
+ private val N_REPEATS = 10_000 * stressTestMultiplier
- fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable<Int>(context) {
+ private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable<Int>(context) {
for (x in start until start + count) send(x)
}
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableMultiTest.kt
index 649590f..8a578f5 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableMultiTest.kt
@@ -17,7 +17,7 @@
@Test
fun testNumbers() {
val n = 100 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
repeat(n) { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -28,7 +28,7 @@
@Test
fun testConcurrentStress() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
@@ -47,7 +47,7 @@
@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(Unconfined) {
+ val observable = GlobalScope.rxObservable(Unconfined) {
Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -58,7 +58,7 @@
@Test
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -68,11 +68,11 @@
@Test
fun testSendAndCrash() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send("O")
throw IOException("K")
}
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableSingleTest.kt
index 0a47ba8..4f351c0 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableSingleTest.kt
@@ -4,21 +4,16 @@
package kotlinx.coroutines.experimental.rx1
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.runBlocking
-import org.junit.Assert.assertEquals
-import org.junit.Assert.fail
-import org.junit.Test
-import rx.Observable
-import java.util.concurrent.TimeUnit
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+import org.junit.Assert.*
+import rx.*
+import java.util.concurrent.*
-/**
- * Tests emitting single item with [rxObservable].
- */
class ObservableSingleTest {
@Test
fun testSingleNoWait() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send("OK")
}
@@ -29,7 +24,7 @@
@Test
fun testSingleNullNoWait() {
- val observable = rxObservable<String?>(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable<String?> {
send(null)
}
@@ -45,7 +40,7 @@
@Test
fun testSingleEmitAndAwait() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O").awaitSingle() + "K")
}
@@ -56,7 +51,7 @@
@Test
fun testSingleWithDelay() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
}
@@ -67,7 +62,7 @@
@Test
fun testSingleException() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "K").awaitSingle() + "K")
}
@@ -78,7 +73,7 @@
@Test
fun testAwaitFirst() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirst() + "K")
}
@@ -89,7 +84,7 @@
@Test
fun testAwaitFirstOrDefault() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
}
@@ -100,7 +95,7 @@
@Test
fun testAwaitFirstOrDefaultWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
}
@@ -111,7 +106,7 @@
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String>(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable<String> {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -122,7 +117,7 @@
@Test
fun testAwaitFirstOrNullWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
}
@@ -133,7 +128,7 @@
@Test
fun testAwaitFirstOrElse() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
}
@@ -144,7 +139,7 @@
@Test
fun testAwaitFirstOrElseWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
}
@@ -155,7 +150,7 @@
@Test
fun testAwaitLast() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("#", "O").awaitLast() + "K")
}
@@ -166,7 +161,7 @@
@Test
fun testExceptionFromObservable() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
try {
send(Observable.error<String>(RuntimeException("O")).awaitFirst())
} catch (e: RuntimeException) {
@@ -181,7 +176,7 @@
@Test
fun testExceptionFromCoroutine() {
- val observable = rxObservable<String>(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable<String> {
error(Observable.just("O").awaitSingle() + "K")
}
@@ -193,7 +188,7 @@
@Test
fun testObservableIteration() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
var result = ""
Observable.just("O", "K").consumeEach {result += it }
send(result)
@@ -206,7 +201,7 @@
@Test
fun testObservableIterationFailure() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
try {
Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableSubscriptionSelectTest.kt
index 18df74f..1dab65a 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableSubscriptionSelectTest.kt
@@ -10,10 +10,9 @@
import org.junit.Assert.*
import org.junit.runner.*
import org.junit.runners.*
-import kotlin.coroutines.experimental.*
@RunWith(Parameterized::class)
-class ObservableSubscriptionSelectTest(val request: Int) : TestBase() {
+class ObservableSubscriptionSelectTest(private val request: Int) : TestBase() {
companion object {
@Parameterized.Parameters(name = "request = {0}")
@JvmStatic
@@ -24,7 +23,7 @@
fun testSelect() = runTest {
// source with n ints
val n = 1000 * stressTestMultiplier
- val source = rxObservable(coroutineContext) { repeat(n) { send(it) } }
+ val source = rxObservable { repeat(n) { send(it) } }
var a = 0
var b = 0
// open two subs
diff --git a/reactive/kotlinx-coroutines-rx1/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx1/test/ObservableTest.kt
index 1ebc8e9..cbf3936 100644
--- a/reactive/kotlinx-coroutines-rx1/test/ObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/ObservableTest.kt
@@ -7,13 +7,12 @@
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.*
import org.junit.*
-import kotlin.coroutines.experimental.*
class ObservableTest : TestBase() {
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val observable = rxObservable(coroutineContext) {
+ val observable = rxObservable {
expect(4)
send("OK")
}
@@ -28,9 +27,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val observable = rxObservable<String>(coroutineContext) {
+ val observable = rxObservable<String> {
expect(4)
throw RuntimeException("OK")
}
@@ -48,9 +47,9 @@
}
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val observable = rxObservable<String>(coroutineContext) {
+ val observable = rxObservable<String> {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-rx1/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx1/test/SingleTest.kt
index dc36b62..461de8b 100644
--- a/reactive/kotlinx-coroutines-rx1/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx1/test/SingleTest.kt
@@ -11,11 +11,7 @@
import org.junit.Assert.*
import rx.*
import java.util.concurrent.*
-import kotlin.coroutines.experimental.*
-/**
- * Tests emitting single item with [rxSingle].
- */
class SingleTest : TestBase() {
@Before
fun setup() {
@@ -23,9 +19,9 @@
}
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
"OK"
}
@@ -40,9 +36,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
throw RuntimeException("OK")
}
@@ -61,9 +57,9 @@
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -85,7 +81,7 @@
@Test
fun testSingleNoWait() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
"OK"
}
@@ -96,7 +92,7 @@
@Test
fun testSingleNullNoWait() {
- val single = rxSingle<String?>(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle<String?> {
null
}
@@ -112,7 +108,7 @@
@Test
fun testSingleEmitAndAwait() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Single.just("O").await() + "K"
}
@@ -123,7 +119,7 @@
@Test
fun testSingleWithDelay() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
@@ -134,7 +130,7 @@
@Test
fun testSingleException() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("O", "K").awaitSingle() + "K"
}
@@ -145,7 +141,7 @@
@Test
fun testAwaitFirst() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("O", "#").awaitFirst() + "K"
}
@@ -156,7 +152,7 @@
@Test
fun testAwaitLast() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("#", "O").awaitLast() + "K"
}
@@ -167,7 +163,7 @@
@Test
fun testExceptionFromObservable() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -182,7 +178,7 @@
@Test
fun testExceptionFromCoroutine() {
- val single = rxSingle<String>(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle<String> {
throw RuntimeException(Observable.just("O").awaitSingle() + "K")
}
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index 7bb2b95..a1ad7d1 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -28,7 +28,6 @@
| [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
| [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
-| [ObservableSource.iterator][io.reactivex.ObservableSource.iterator] | Subscribes to observable and returns [ChannelIterator]
Note, that `Flowable` is a subclass of [Reactive Streams](http://www.reactive-streams.org)
`Publisher` and extensions for it are covered by
@@ -50,14 +49,13 @@
<!--- INDEX kotlinx.coroutines.experimental.channels -->
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
-[ChannelIterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel-iterator/index.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.experimental.rx2 -->
-[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-completable.html
-[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-maybe.html
-[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-single.html
-[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-observable.html
-[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
+[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-completable.html
+[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-maybe.html
+[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-single.html
+[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-observable.html
+[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-coroutine-scope/rx-flowable.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-maybe-source/await-or-default.html
@@ -69,7 +67,6 @@
[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first-or-null.html
[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-single.html
[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/open-subscription.html
-[io.reactivex.ObservableSource.iterator]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/iterator.html
[kotlinx.coroutines.experimental.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-job/as-completable.html
[kotlinx.coroutines.experimental.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.-deferred/as-single.html
[kotlinx.coroutines.experimental.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/kotlinx.coroutines.experimental.channels.-receive-channel/as-observable.html
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
index 0b31efc..28bebc2 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
@@ -5,7 +5,7 @@
package kotlinx.coroutines.experimental.rx2
import io.reactivex.*
-import io.reactivex.disposables.Disposable
+import io.reactivex.disposables.*
import kotlinx.coroutines.experimental.channels.*
/**
@@ -24,15 +24,6 @@
@Suppress("CONFLICTING_OVERLOADS")
public fun <T> MaybeSource<T>.openSubscription(): SubscriptionReceiveChannel<T> =
openSubscription() as SubscriptionReceiveChannel<T>
-
-/**
- * @suppress **Deprecated**: Renamed to [openSubscription]
- */
-@Deprecated(message = "Renamed to `openSubscription`",
- replaceWith = ReplaceWith("openSubscription()"))
-public fun <T> MaybeSource<T>.open(): SubscriptionReceiveChannel<T> =
- openSubscription() as SubscriptionReceiveChannel<T>
-
/**
* Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source.
@@ -51,30 +42,9 @@
openSubscription() as SubscriptionReceiveChannel<T>
/**
- * @suppress **Deprecated**: Renamed to [openSubscription]
- */
-@Deprecated(message = "Renamed to `openSubscription`",
- replaceWith = ReplaceWith("openSubscription()"))
-public fun <T> ObservableSource<T>.open(): SubscriptionReceiveChannel<T> =
- openSubscription() as SubscriptionReceiveChannel<T>
-
-/**
- * Subscribes to this [Observable] and returns an iterator to receive elements emitted by it.
- *
- * This is a shortcut for `open().iterator()`. See [openSubscription] if you need an ability to manually
- * unsubscribe from the observable.
- */
-@Suppress("DeprecatedCallableAddReplaceWith")
-@Deprecated(message =
-"This iteration operator for `for (x in source) { ... }` loop is deprecated, " +
- "because it leaves code vulnerable to leaving unclosed subscriptions on exception. " +
- "Use `source.consumeEach { x -> ... }`.")
-public operator fun <T> ObservableSource<T>.iterator() = openSubscription().iterator()
-
-/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
*/
-public inline suspend fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
+public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) {
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
@@ -83,19 +53,12 @@
/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
*/
-public inline suspend fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
+public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) {
val channel = openSubscription()
for (x in channel) action(x)
channel.cancel()
}
-/**
- * @suppress: **Deprecated**: binary compatibility with old code
- */
-@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
-public suspend fun <T> ObservableSource<T>.consumeEach(action: suspend (T) -> Unit) =
- consumeEach { action(it) }
-
private class SubscriptionChannel<T> :
LinkedListChannel<T>(), ReceiveChannel<T>, Observer<T>, MaybeObserver<T>, SubscriptionReceiveChannel<T>
{
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
index 32686d4..f055575 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
@@ -19,37 +19,38 @@
* | Completes successfully | `onCompleted`
* | Failure with exception or unsubscribe | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun rxCompletable(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun CoroutineScope.rxCompletable(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Unit
): Completable = Completable.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxCompletableCoroutine(newContext, subscriber)
subscriber.setCancellable(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [Completable] that runs a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxCompletable] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxCompletable(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx2.rxCompletable"])
+)
public fun rxCompletable(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
-): Completable =
- rxCompletable(context, block = block)
+): Completable = GlobalScope.rxCompletable(context + (parent ?: EmptyCoroutineContext), block)
private class RxCompletableCoroutine(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 801c59d..d0bade4 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -4,14 +4,10 @@
package kotlinx.coroutines.experimental.rx2
-import io.reactivex.Completable
-import io.reactivex.Maybe
-import kotlinx.coroutines.experimental.Deferred
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import io.reactivex.Observable
-import io.reactivex.Single
-import kotlin.coroutines.experimental.CoroutineContext
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlin.coroutines.experimental.*
/**
* Converts this job to the hot reactive completable that signals
@@ -22,7 +18,7 @@
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
+public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) {
this@asCompletable.join()
}
@@ -35,7 +31,7 @@
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
-public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe<T>(context) {
+public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = GlobalScope.rxMaybe(context) {
this@asMaybe.await()
}
@@ -48,7 +44,7 @@
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-public fun <T> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle<T>(context) {
+public fun <T> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = GlobalScope.rxSingle(context) {
this@asSingle.await()
}
@@ -60,28 +56,7 @@
*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
-public fun <T> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
+public fun <T> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = GlobalScope.rxObservable(context) {
for (t in this@asObservable)
send(t)
}
-
-/**
- * @suppress **Deprecated**: Renamed to [asCompletable]
- */
-@Deprecated(message = "Renamed to `asCompletable`",
- replaceWith = ReplaceWith("asCompletable(context)"))
-public fun Job.toCompletable(context: CoroutineContext): Completable = asCompletable(context)
-
-/**
- * @suppress **Deprecated**: Renamed to [asSingle]
- */
-@Deprecated(message = "Renamed to `asSingle`",
- replaceWith = ReplaceWith("asSingle(context)"))
-public fun <T> Deferred<T>.toSingle(context: CoroutineContext): Single<T> = asSingle(context)
-
-/**
- * @suppress **Deprecated**: Renamed to [asObservable]
- */
-@Deprecated(message = "Renamed to `asObservable`",
- replaceWith = ReplaceWith("asObservable(context)"))
-public fun <T> ReceiveChannel<T>.toObservable(context: CoroutineContext): Observable<T> = asObservable(context)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
index c137afc..0818def 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
@@ -4,13 +4,10 @@
package kotlinx.coroutines.experimental.rx2
-import io.reactivex.Flowable
-import kotlinx.coroutines.experimental.CoroutineDispatcher
-import kotlinx.coroutines.experimental.CoroutineScope
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.channels.ProducerScope
-import kotlinx.coroutines.experimental.reactive.publish
+import io.reactivex.*
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.reactive.*
import kotlin.coroutines.experimental.*
/**
@@ -27,17 +24,30 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
+public fun <T> CoroutineScope.rxFlowable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend ProducerScope<T>.() -> Unit
+): Flowable<T> = Flowable.fromPublisher(publish(newCoroutineContext(context), block = block))
+
+/**
+ * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxFlowable] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxFlowable(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx2.rxFlowable"])
+)
@JvmOverloads // for binary compatibility with older code compiled before context had a default
public fun <T> rxFlowable(
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
-): Flowable<T> = Flowable.fromPublisher(publish(context, block = block))
+): Flowable<T> = GlobalScope.rxFlowable(context, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
index 2d192c1..935981c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
@@ -20,37 +20,38 @@
* | Returns a null | `onComplete`
* | Failure with exception or unsubscribe | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> rxMaybe(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.rxMaybe(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Maybe<T> = Maybe.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxMaybeCoroutine(newContext, subscriber)
subscriber.setCancellable(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxMaybe] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxMaybe(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx2.rxMaybe"])
+)
public fun <T> rxMaybe(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> T?
-): Maybe<T> =
- rxMaybe(context, block = block)
+): Maybe<T> = GlobalScope.rxMaybe(context + (parent ?: EmptyCoroutineContext), block)
private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index dab014f..18f855d 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -27,37 +27,38 @@
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> rxObservable(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Observable<T> = Observable.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxObservableCoroutine(newContext, subscriber)
subscriber.setCancellable(coroutine) // do it first (before starting coroutine), to await unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxObservable] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxObservable(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx2.rxObservable"])
+)
public fun <T> rxObservable(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend ProducerScope<T>.() -> Unit
-): Observable<T> =
- rxObservable(context, block = block)
+): Observable<T> = GlobalScope.rxObservable(context + (parent ?: EmptyCoroutineContext), block)
private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index f0d402e..20f0091 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -19,37 +19,38 @@
* | Returns a value | `onSuccess`
* | Failure with exception or unsubscribe | `onError`
*
- * The [context] for the new coroutine can be explicitly specified.
- * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
- * The [coroutineContext] of the parent coroutine may be used,
- * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
- * The parent job may be also explicitly specified using [parent] parameter.
- *
+ * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
+ * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
+ * with corresponding [coroutineContext] element.
*
- * @param context context of the coroutine. The default value is [DefaultDispatcher].
- * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
+ * @param context context of the coroutine.
* @param block the coroutine code.
*/
-public fun <T> rxSingle(
- context: CoroutineContext = DefaultDispatcher,
- parent: Job? = null,
+public fun <T> CoroutineScope.rxSingle(
+ context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
- val newContext = newCoroutineContext(context, parent)
+ val newContext = newCoroutineContext(context)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.setCancellable(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-/** @suppress **Deprecated**: Binary compatibility */
-@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
-@JvmOverloads // for binary compatibility with older code compiled before context had a default
+/**
+ * Creates cold [single][Single] that will run a given [block] in a coroutine.
+ * @suppress **Deprecated** Use [CoroutineScope.rxSingle] instead.
+ */
+@Deprecated(
+ message = "Standalone coroutine builders are deprecated, use extensions on CoroutineScope instead",
+ replaceWith = ReplaceWith("GlobalScope.rxSingle(context, block)",
+ imports = ["kotlinx.coroutines.experimental.GlobalScope", "kotlinx.coroutines.experimental.rx2.rxSingle"])
+)
public fun <T> rxSingle(
context: CoroutineContext = DefaultDispatcher,
+ parent: Job? = null,
block: suspend CoroutineScope.() -> T
-): Single<T> =
- rxSingle(context, block = block)
+): Single<T> = GlobalScope.rxSingle(context + (parent ?: EmptyCoroutineContext), block)
private class RxSingleCoroutine<T>(
parentContext: CoroutineContext,
diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
index 761600e..e2914a7 100644
--- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
@@ -8,13 +8,12 @@
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
class CompletableTest : TestBase() {
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
}
expect(2)
@@ -27,9 +26,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
throw RuntimeException("OK")
}
@@ -47,9 +46,9 @@
}
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -70,9 +69,9 @@
}
@Test
- fun testAwaitSuccess() = runBlocking<Unit> {
+ fun testAwaitSuccess() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(3)
}
expect(2)
@@ -81,9 +80,9 @@
}
@Test
- fun testAwaitFailure() = runBlocking<Unit> {
+ fun testAwaitFailure() = runBlocking {
expect(1)
- val completable = rxCompletable(coroutineContext) {
+ val completable = rxCompletable {
expect(3)
throw RuntimeException("OK")
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
index 8b7fbfc..5284725 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt
@@ -13,9 +13,9 @@
class TestException(s: String): RuntimeException(s)
@Test
- fun testToCompletableSuccess() = runBlocking<Unit> {
+ fun testToCompletableSuccess() = runBlocking {
expect(1)
- val job = launch(coroutineContext) {
+ val job = launch {
expect(3)
}
val completable = job.asCompletable(coroutineContext)
@@ -28,9 +28,9 @@
}
@Test
- fun testToCompletableFail() = runBlocking<Unit> {
+ fun testToCompletableFail() = runBlocking {
expect(1)
- val job = async(coroutineContext + NonCancellable) { // don't kill parent on exception
+ val job = async(NonCancellable) { // don't kill parent on exception
expect(3)
throw RuntimeException("OK")
}
@@ -121,7 +121,7 @@
@Test
fun testToObservable() {
- val c = GlobalScope.produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
@@ -135,14 +135,14 @@
@Test
fun testToObservableFail() {
- val c = GlobalScope.produce(DefaultDispatcher) {
+ val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
throw TestException("K")
}
val observable = c.asObservable(Unconfined)
- val single = rxSingle(Unconfined) {
+ val single = GlobalScope.rxSingle(Unconfined) {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
index e85646f..415a65b 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
@@ -7,13 +7,12 @@
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.*
import org.junit.*
-import kotlin.coroutines.experimental.*
class FlowableTest : TestBase() {
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val observable = rxFlowable(coroutineContext) {
+ val observable = rxFlowable {
expect(4)
send("OK")
}
@@ -28,9 +27,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val observable = rxFlowable<String>(coroutineContext) {
+ val observable = rxFlowable<String> {
expect(4)
throw RuntimeException("OK")
}
@@ -48,9 +47,9 @@
}
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val observable = rxFlowable<String>(coroutineContext) {
+ val observable = rxFlowable<String> {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index f04b609..5e612dc 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -15,8 +15,8 @@
@RunWith(Parameterized::class)
class IntegrationTest(
- val ctx: Ctx,
- val delay: Boolean
+ private val ctx: Ctx,
+ private val delay: Boolean
) : TestBase() {
enum class Ctx {
@@ -57,8 +57,8 @@
}
@Test
- fun testSingle() = runBlocking<Unit> {
- val observable = rxObservable<String>(ctx(coroutineContext)) {
+ fun testSingle() = runBlocking {
+ val observable = CoroutineScope(ctx(coroutineContext)).rxObservable {
if (delay) delay(1)
send("OK")
}
@@ -79,7 +79,7 @@
@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
- val observable = rxObservable<Int>(ctx(coroutineContext)) {
+ val observable = rxObservable(ctx(coroutineContext)) {
for (i in 1..n) {
send(i)
if (delay) delay(1)
@@ -100,7 +100,7 @@
@Test
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
- rxObservable<String>(coroutineContext) {
+ rxObservable<String> {
yield()
expectUnreached()
}.awaitFirst()
@@ -114,7 +114,7 @@
fun testEmptySingle() = runTest(unhandled = listOf({e -> e is NoSuchElementException})) {
expect(1)
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
- rxObservable<String>(coroutineContext) {
+ rxObservable<String> {
yield()
expect(2)
// Nothing to emit
@@ -134,7 +134,7 @@
}
- inline fun assertIAE(block: () -> Unit) {
+ private inline fun assertIAE(block: () -> Unit) {
try {
block()
expectUnreached()
@@ -143,7 +143,7 @@
}
}
- inline fun assertNSE(block: () -> Unit) {
+ private inline fun assertNSE(block: () -> Unit) {
try {
block()
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
index df462bc..b5ca5b9 100644
--- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
@@ -12,7 +12,6 @@
import org.junit.*
import org.junit.Assert.*
import java.util.concurrent.*
-import kotlin.coroutines.experimental.*
class MaybeTest : TestBase() {
@Before
@@ -21,9 +20,9 @@
}
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val maybe = rxMaybe(coroutineContext) {
+ val maybe = rxMaybe {
expect(4)
"OK"
}
@@ -38,9 +37,9 @@
}
@Test
- fun testBasicEmpty() = runBlocking<Unit> {
+ fun testBasicEmpty() = runBlocking {
expect(1)
- val maybe = rxMaybe(coroutineContext) {
+ val maybe = rxMaybe {
expect(4)
null
}
@@ -54,9 +53,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val maybe = rxMaybe(coroutineContext) {
+ val maybe = rxMaybe {
expect(4)
throw RuntimeException("OK")
}
@@ -75,9 +74,9 @@
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val maybe = rxMaybe(coroutineContext) {
+ val maybe = rxMaybe {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -99,7 +98,7 @@
@Test
fun testMaybeNoWait() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
"OK"
}
@@ -120,7 +119,7 @@
@Test
fun testMaybeEmitAndAwait() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
Maybe.just("O").await() + "K"
}
@@ -131,7 +130,7 @@
@Test
fun testMaybeWithDelay() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
@@ -142,7 +141,7 @@
@Test
fun testMaybeException() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
Observable.just("O", "K").awaitSingle() + "K"
}
@@ -153,7 +152,7 @@
@Test
fun testAwaitFirst() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
Observable.just("O", "#").awaitFirst() + "K"
}
@@ -164,7 +163,7 @@
@Test
fun testAwaitLast() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
Observable.just("#", "O").awaitLast() + "K"
}
@@ -175,7 +174,7 @@
@Test
fun testExceptionFromObservable() {
- val maybe = rxMaybe(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -190,7 +189,7 @@
@Test
fun testExceptionFromCoroutine() {
- val maybe = rxMaybe<String>(DefaultDispatcher) {
+ val maybe = GlobalScope.rxMaybe<String> {
throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt
index f219003..92ca2f3 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCompletionStressTest.kt
@@ -4,18 +4,15 @@
package kotlinx.coroutines.experimental.rx2
-import kotlinx.coroutines.experimental.DefaultDispatcher
-import kotlinx.coroutines.experimental.TestBase
-import kotlinx.coroutines.experimental.runBlocking
-import kotlinx.coroutines.experimental.withTimeout
-import org.junit.Test
+import kotlinx.coroutines.experimental.*
+import org.junit.*
import java.util.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
class ObservableCompletionStressTest : TestBase() {
- val N_REPEATS = 10_000 * stressTestMultiplier
+ private val N_REPEATS = 10_000 * stressTestMultiplier
- fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable<Int>(context) {
+ private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = rxObservable(context) {
for (x in start until start + count) send(x)
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
index 710d2e0..2e0596d 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt
@@ -17,7 +17,7 @@
@Test
fun testNumbers() {
val n = 100 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
repeat(n) { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -28,7 +28,7 @@
@Test
fun testConcurrentStress() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
// concurrent emitters (many coroutines)
val jobs = List(n) {
// launch
@@ -47,7 +47,7 @@
@Test
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(Unconfined) {
+ val observable = GlobalScope.rxObservable(Unconfined) {
Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -58,7 +58,7 @@
@Test
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
Observable.range(0, n).consumeEach { send(it) }
}
checkSingleValue(observable.toList()) { list ->
@@ -68,11 +68,11 @@
@Test
fun testSendAndCrash() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send("O")
throw IOException("K")
}
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
var result = ""
try {
observable.consumeEach { result += it }
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
index 1dc9cb0..a2b18ef 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
@@ -10,13 +10,10 @@
import org.junit.Assert.*
import java.util.concurrent.*
-/**
- * Tests emitting single item with [rxObservable].
- */
class ObservableSingleTest {
@Test
fun testSingleNoWait() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send("OK")
}
@@ -32,7 +29,7 @@
@Test
fun testSingleEmitAndAwait() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O").awaitSingle() + "K")
}
@@ -43,7 +40,7 @@
@Test
fun testSingleWithDelay() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
}
@@ -54,7 +51,7 @@
@Test
fun testSingleException() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "K").awaitSingle() + "K")
}
@@ -65,7 +62,7 @@
@Test
fun testAwaitFirst() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirst() + "K")
}
@@ -76,7 +73,7 @@
@Test
fun testAwaitFirstOrDefault() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
}
@@ -87,7 +84,7 @@
@Test
fun testAwaitFirstOrDefaultWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
}
@@ -98,7 +95,7 @@
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String>(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable<String> {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -109,7 +106,7 @@
@Test
fun testAwaitFirstOrNullWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
}
@@ -120,7 +117,7 @@
@Test
fun testAwaitFirstOrElse() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
}
@@ -131,7 +128,7 @@
@Test
fun testAwaitFirstOrElseWithValues() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
}
@@ -142,7 +139,7 @@
@Test
fun testAwaitLast() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
send(Observable.just("#", "O").awaitLast() + "K")
}
@@ -153,7 +150,7 @@
@Test
fun testExceptionFromObservable() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
try {
send(Observable.error<String>(RuntimeException("O")).awaitFirst())
} catch (e: RuntimeException) {
@@ -168,7 +165,7 @@
@Test
fun testExceptionFromCoroutine() {
- val observable = rxObservable<String>(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable<String> {
error(Observable.just("O").awaitSingle() + "K")
}
@@ -180,7 +177,7 @@
@Test
fun testObservableIteration() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
var result = ""
Observable.just("O", "K").consumeEach { result += it }
send(result)
@@ -193,7 +190,7 @@
@Test
fun testObservableIterationFailure() {
- val observable = rxObservable(DefaultDispatcher) {
+ val observable = GlobalScope.rxObservable {
try {
Observable.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
send("Fail")
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
index 6211e1b..e56dfdc 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
@@ -8,14 +8,13 @@
import kotlinx.coroutines.experimental.selects.*
import org.junit.*
import org.junit.Assert.*
-import kotlin.coroutines.experimental.*
-class ObservableSubscriptionSelectTest() : TestBase() {
+class ObservableSubscriptionSelectTest : TestBase() {
@Test
fun testSelect() = runTest {
// source with n ints
val n = 1000 * stressTestMultiplier
- val source = rxObservable(coroutineContext) { repeat(n) { send(it) } }
+ val source = rxObservable { repeat(n) { send(it) } }
var a = 0
var b = 0
// open two subs
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
index 84c5450..0e2575b 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
@@ -7,13 +7,12 @@
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.*
import org.junit.*
-import kotlin.coroutines.experimental.*
class ObservableTest : TestBase() {
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val observable = rxObservable(coroutineContext) {
+ val observable = rxObservable {
expect(4)
send("OK")
}
@@ -28,9 +27,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val observable = rxObservable<String>(coroutineContext) {
+ val observable = rxObservable<String> {
expect(4)
throw RuntimeException("OK")
}
@@ -48,9 +47,9 @@
}
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val observable = rxObservable<String>(coroutineContext) {
+ val observable = rxObservable<String> {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
index d9e5d32..fe04727 100644
--- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
@@ -10,11 +10,7 @@
import org.junit.*
import org.junit.Assert.*
import java.util.concurrent.*
-import kotlin.coroutines.experimental.*
-/**
- * Tests emitting single item with [rxSingle].
- */
class SingleTest : TestBase() {
@Before
fun setup() {
@@ -22,9 +18,9 @@
}
@Test
- fun testBasicSuccess() = runBlocking<Unit> {
+ fun testBasicSuccess() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
"OK"
}
@@ -39,9 +35,9 @@
}
@Test
- fun testBasicFailure() = runBlocking<Unit> {
+ fun testBasicFailure() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
throw RuntimeException("OK")
}
@@ -60,9 +56,9 @@
@Test
- fun testBasicUnsubscribe() = runBlocking<Unit> {
+ fun testBasicUnsubscribe() = runBlocking {
expect(1)
- val single = rxSingle(coroutineContext) {
+ val single = rxSingle {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
@@ -84,7 +80,7 @@
@Test
fun testSingleNoWait() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
"OK"
}
@@ -100,7 +96,7 @@
@Test
fun testSingleEmitAndAwait() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Single.just("O").await() + "K"
}
@@ -111,7 +107,7 @@
@Test
fun testSingleWithDelay() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
}
@@ -122,7 +118,7 @@
@Test
fun testSingleException() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("O", "K").awaitSingle() + "K"
}
@@ -133,7 +129,7 @@
@Test
fun testAwaitFirst() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("O", "#").awaitFirst() + "K"
}
@@ -144,7 +140,7 @@
@Test
fun testAwaitLast() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
Observable.just("#", "O").awaitLast() + "K"
}
@@ -155,7 +151,7 @@
@Test
fun testExceptionFromObservable() {
- val single = rxSingle(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle {
try {
Observable.error<String>(RuntimeException("O")).awaitFirst()
} catch (e: RuntimeException) {
@@ -170,7 +166,7 @@
@Test
fun testExceptionFromCoroutine() {
- val single = rxSingle<String>(DefaultDispatcher) {
+ val single = GlobalScope.rxSingle<String> {
throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt
index 4cdc4c4..a80d5df 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt
@@ -7,11 +7,10 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
// create a channel that produces numbers from 1 to 3 with 200ms delays between them
- val source = produce<Int>(coroutineContext) {
+ val source = produce<Int> {
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
delay(200) // wait for 200ms
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt
index 5ce8b54..c4d5ee0 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt
@@ -7,11 +7,10 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
// create a publisher that produces numbers from 1 to 3 with 200ms delays between them
- val source = publish<Int>(coroutineContext) {
+ val source = publish<Int> {
// ^^^^^^^ <--- Difference from the previous examples is here
println("Begin") // mark the beginning of this coroutine in output
for (x in 1..3) {
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt
index 305cad1..55e98a3 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt
@@ -8,11 +8,10 @@
import io.reactivex.schedulers.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.rx2.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
// coroutine -- fast producer of elements in the context of the main thread
- val source = rxFlowable(coroutineContext) {
+ val source = rxFlowable {
for (x in 1..3) {
send(x) // this is a suspending function
println("Sent $x") // print after successfully sent item
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt
index f3a4bf6..ebfba2c 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt
@@ -8,14 +8,13 @@
import io.reactivex.subjects.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.rx2.*
-import kotlin.coroutines.experimental.*
fun main(args: Array<String>) = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print the most recent update
- launch(coroutineContext) { // use the context of the main thread for a coroutine
+ launch { // use the context of the main thread for a coroutine
subject.consumeEach { println(it) }
}
subject.onNext("three")
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt
index 2529d5b..3c9ea39 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt
@@ -5,16 +5,15 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package kotlinx.coroutines.experimental.rx2.guide.basic09
-import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.*
-import kotlin.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
fun main(args: Array<String>) = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// now launch a coroutine to print the most recent update
- launch(coroutineContext) { // use the context of the main thread for a coroutine
+ launch { // use the context of the main thread for a coroutine
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
index ff84fd0..87f2bdd 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt
@@ -8,9 +8,9 @@
import io.reactivex.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
index 9aa4b45..8e82d36 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt
@@ -6,12 +6,12 @@
package kotlinx.coroutines.experimental.rx2.guide.context03
import io.reactivex.*
+import io.reactivex.schedulers.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.*
-import io.reactivex.schedulers.Schedulers
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt
index b30b1e7..9177e64 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt
@@ -7,12 +7,13 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.*
-import kotlin.coroutines.experimental.CoroutineContext
+import kotlin.coroutines.experimental.*
-fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) send(x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
- range(CommonPool, 1, 5).consumeEach { println(it) }
+ // Range inherits parent job from runBlocking, but overrides dispatcher with DefaultDispatcher
+ range(DefaultDispatcher, 1, 5).consumeEach { println(it) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
index 261b58a..dca2745 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt
@@ -14,19 +14,19 @@
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
-) = publish<R>(context) {
+) = GlobalScope.publish<R>(context) {
consumeEach { // consume the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
}
-fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
for (x in start until start + count) send(x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
- range(coroutineContext, 1, 5)
+ range(1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.consumeEach { println(it) } // print all the resulting strings
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
index cbfc728..7db378e 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt
@@ -5,14 +5,14 @@
// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit.
package kotlinx.coroutines.experimental.rx2.guide.operators03
-import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.reactive.*
import kotlinx.coroutines.experimental.selects.*
import org.reactivestreams.*
import kotlin.coroutines.experimental.*
-fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
+fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
@@ -25,7 +25,7 @@
}
}
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
@@ -33,7 +33,7 @@
}
fun main(args: Array<String>) = runBlocking<Unit> {
- val slowNums = rangeWithInterval(coroutineContext, 200, 1, 10) // numbers with 200ms interval
- val stop = rangeWithInterval(coroutineContext, 500, 1, 10) // the first one after 500ms
+ val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
+ val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
index e511be2..002817f 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt
@@ -10,28 +10,28 @@
import org.reactivestreams.*
import kotlin.coroutines.experimental.*
-fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
+fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
consumeEach { pub -> // for each publisher received on the source channel
- launch(coroutineContext) { // launch a child coroutine
+ launch { // launch a child coroutine
pub.consumeEach { send(it) } // resend all element from this publisher
}
}
}
-fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
+fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
}
}
-fun testPub(context: CoroutineContext) = publish<Publisher<Int>>(context) {
- send(rangeWithInterval(context, 250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
+fun CoroutineScope.testPub() = publish<Publisher<Int>> {
+ send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms
delay(100) // wait for 100 ms
- send(rangeWithInterval(context, 500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
+ send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms
delay(1100) // wait for 1.1s - done in 1.2 sec after start
}
fun main(args: Array<String>) = runBlocking<Unit> {
- testPub(coroutineContext).merge(coroutineContext).consumeEach { println(it) } // print the whole stream
+ testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream
}