Coroutine context propagation for Reactor to coroutines API migration (#1377)
* Propagation of the coroutine context of await calls into
Mono/Flux builder
* Publisher.asFlow propagates coroutine context from `collect`
call to the Publisher
* Flow.asFlux transform
* Optimized FlowSubscription
* kotlinx.coroutines.reactor.flow package is replaced with kotlinx.coroutines.reactor
Fixes #284
diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt
index d12a628..072773a 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Await.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt
@@ -10,6 +10,7 @@
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
+import java.util.*
import kotlin.coroutines.*
/**
@@ -81,6 +82,16 @@
// ------------------------ private ------------------------
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: Array<ContextInjector> =
+ ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+ contextInjectors.fold(this) { pub, contextInjector ->
+ contextInjector.injectCoroutineContext(pub, coroutineContext)
+ }
+
private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -93,7 +104,7 @@
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
- subscribe(object : Subscriber<T> {
+ injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
new file mode 100644
index 0000000..45f6553
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
@@ -0,0 +1,15 @@
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.InternalCoroutinesApi
+import org.reactivestreams.Publisher
+import kotlin.coroutines.CoroutineContext
+
+/** @suppress */
+@InternalCoroutinesApi
+public interface ContextInjector {
+ /**
+ * Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
+ * This API used as an indirection layer between `reactive` and `reactor` modules.
+ */
+ public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
new file mode 100644
index 0000000..387c8e7
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.reactivestreams.*
+import kotlinx.coroutines.intrinsics.*
+
+/**
+ * Transforms the given flow to a spec-compliant [Publisher].
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+
+/**
+ * Adapter that transforms [Flow] into TCK-complaint [Publisher].
+ * [cancel] invocation cancels the original flow.
+ */
+@Suppress("PublisherImplementation")
+private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+ override fun subscribe(subscriber: Subscriber<in T>?) {
+ if (subscriber == null) throw NullPointerException()
+ subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+ }
+}
+
+/** @suppress */
+@InternalCoroutinesApi
+public class FlowSubscription<T>(
+ @JvmField val flow: Flow<T>,
+ @JvmField val subscriber: Subscriber<in T>
+) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
+ private val requested = atomic(0L)
+ private val producer = atomic<CancellableContinuation<Unit>?>(null)
+
+ override fun onStart() {
+ ::flowProcessing.startCoroutineCancellable(this)
+ }
+
+ private suspend fun flowProcessing() {
+ try {
+ consumeFlow()
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ try {
+ if (e is CancellationException) {
+ subscriber.onComplete()
+ } else {
+ subscriber.onError(e)
+ }
+ } catch (e: Throwable) {
+ // Last ditch report
+ handleCoroutineException(coroutineContext, e)
+ }
+ }
+ }
+
+ /*
+ * This method has at most one caller at any time (triggered from the `request` method)
+ */
+ private suspend fun consumeFlow() {
+ flow.collect { value ->
+ /*
+ * Flow is scopeless, thus if it's not active, its subscription was cancelled.
+ * No intermediate "child failed, but flow coroutine is not" states are allowed.
+ */
+ coroutineContext.ensureActive()
+ if (requested.value <= 0L) {
+ suspendCancellableCoroutine<Unit> {
+ producer.value = it
+ if (requested.value != 0L) it.resumeSafely()
+ }
+ }
+ requested.decrementAndGet()
+ subscriber.onNext(value)
+ }
+ }
+
+ override fun cancel() {
+ cancel(null)
+ }
+
+ override fun request(n: Long) {
+ if (n <= 0) {
+ return
+ }
+ start()
+ requested.update { value ->
+ val newValue = value + n
+ if (newValue <= 0L) Long.MAX_VALUE else newValue
+ }
+ val producer = producer.getAndSet(null) ?: return
+ producer.resumeSafely()
+ }
+
+ private fun CancellableContinuation<Unit>.resumeSafely() {
+ val token = tryResume(Unit)
+ if (token != null) {
+ completeResume(token)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
similarity index 83%
rename from reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
rename to reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
index 50338de..8da106e 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
@@ -2,14 +2,17 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
+import java.util.*
import kotlin.coroutines.*
/**
@@ -21,13 +24,11 @@
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
-@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)
@FlowPreview
-@JvmName("from")
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
@@ -46,7 +47,9 @@
// use another channel for conflation (cannot do openSubscription)
if (capacity < 0) return super.produceImpl(scope)
// Open subscription channel directly
- val channel = publisher.openSubscription(capacity)
+ val channel = publisher
+ .injectCoroutineContext(scope.coroutineContext)
+ .openSubscription(capacity)
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
channel.cancel(cause?.let {
it as? CancellationException ?: CancellationException("Job was cancelled", it)
@@ -70,7 +73,7 @@
override suspend fun collect(collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
- publisher.subscribe(subscriber)
+ publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
@@ -127,3 +130,11 @@
subscription.cancel()
}
}
+
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: List<ContextInjector> =
+ ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+ contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
deleted file mode 100644
index 05f2391..0000000
--- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.reactive.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import org.reactivestreams.*
-import java.util.concurrent.atomic.*
-import kotlin.coroutines.*
-
-/**
- * Transforms the given flow to a spec-compliant [Publisher].
- */
-@JvmName("from")
-@ExperimentalCoroutinesApi
-public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
-
-/**
- * Adapter that transforms [Flow] into TCK-complaint [Publisher].
- * [cancel] invocation cancels the original flow.
- */
-@Suppress("PublisherImplementation")
-private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
-
- override fun subscribe(subscriber: Subscriber<in T>?) {
- if (subscriber == null) throw NullPointerException()
- subscriber.onSubscribe(FlowSubscription(flow, subscriber))
- }
-
- private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
- @Volatile
- internal var canceled: Boolean = false
- private val requested = AtomicLong(0L)
- private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
-
- // This is actually optimizable
- private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
- try {
- consumeFlow()
- subscriber.onComplete()
- } catch (e: Throwable) {
- // Failed with real exception, not due to cancellation
- if (!coroutineContext[Job]!!.isCancelled) {
- subscriber.onError(e)
- }
- }
- }
-
- private suspend fun consumeFlow() {
- flow.collect { value ->
- if (!coroutineContext.isActive) {
- subscriber.onComplete()
- coroutineContext.ensureActive()
- }
-
- if (requested.get() == 0L) {
- suspendCancellableCoroutine<Unit> {
- producer.set(it)
- if (requested.get() != 0L) it.resumeSafely()
- }
- }
-
- requested.decrementAndGet()
- subscriber.onNext(value)
- }
- }
-
- override fun cancel() {
- canceled = true
- job.cancel()
- }
-
- override fun request(n: Long) {
- if (n <= 0) {
- return
- }
-
- if (canceled) return
-
- job.start()
- var snapshot: Long
- var newValue: Long
- do {
- snapshot = requested.get()
- newValue = snapshot + n
- if (newValue <= 0L) newValue = Long.MAX_VALUE
- } while (!requested.compareAndSet(snapshot, newValue))
-
- val prev = producer.get()
- if (prev == null || !producer.compareAndSet(prev, null)) return
- prev.resumeSafely()
- }
-
- private fun CancellableContinuation<Unit>.resumeSafely() {
- val token = tryResume(Unit)
- if (token != null) {
- completeResume(token)
- }
- }
- }
-}
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
new file mode 100644
index 0000000..8633492
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import org.reactivestreams.*
+import kotlin.test.*
+
+class FlowAsPublisherTest : TestBase() {
+
+ @Test
+ fun testErrorOnCancellationIsReported() {
+ expect(1)
+ flow<Int> {
+ emit(2)
+ try {
+ hang { expect(3) }
+ } finally {
+ throw TestException()
+ }
+ }.asPublisher().subscribe(object : Subscriber<Int> {
+ private lateinit var subscription: Subscription
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onSubscribe(s: Subscription?) {
+ subscription = s!!
+ subscription.request(2)
+ }
+
+ override fun onNext(t: Int) {
+ expect(t)
+ subscription.cancel()
+ }
+
+ override fun onError(t: Throwable?) {
+ assertTrue(t is TestException)
+ expect(4)
+ }
+ })
+ finish(5)
+ }
+
+ @Test
+ fun testCancellationIsNotReported() {
+ expect(1)
+ flow<Int> {
+ emit(2)
+ hang { expect(3) }
+ }.asPublisher().subscribe(object : Subscriber<Int> {
+ private lateinit var subscription: Subscription
+
+ override fun onComplete() {
+ expect(4)
+ }
+
+ override fun onSubscribe(s: Subscription?) {
+ subscription = s!!
+ subscription.request(2)
+ }
+
+ override fun onNext(t: Int) {
+ expect(t)
+ subscription.cancel()
+ }
+
+ override fun onError(t: Throwable?) {
+ expectUnreached()
+ }
+ })
+ finish(5)
+ }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
similarity index 98%
rename from reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
index 31c5a3c..5dfd9d5 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
@@ -4,7 +4,7 @@
@file:Suppress("UNCHECKED_CAST")
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
similarity index 97%
rename from reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index 3f33b33..a37719d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -2,12 +2,11 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.*
import kotlin.test.*
class PublisherAsFlowTest : TestBase() {
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
similarity index 95%
rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
index 2ff96eb..b710c59 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import kotlinx.coroutines.flow.*
import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
similarity index 96%
rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
index 1b37ee9..72d5de5 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import org.junit.*
import org.reactivestreams.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
similarity index 97%
rename from reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
index 9e61100..63d444c 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
import org.junit.*
import org.reactivestreams.example.unicast.AsyncIterablePublisher
diff --git a/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
new file mode 100644
index 0000000..0097ec3
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
@@ -0,0 +1 @@
+kotlinx.coroutines.reactor.ReactorContextInjector
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
new file mode 100644
index 0000000..7c6182b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
@@ -0,0 +1,26 @@
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flowOn
+import kotlinx.coroutines.reactive.FlowSubscription
+import reactor.core.CoreSubscriber
+import reactor.core.publisher.Flux
+
+/**
+ * Converts the given flow to a cold flux.
+ * The original flow is cancelled when the flux subscriber is disposed.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
+
+private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
+ override fun subscribe(subscriber: CoreSubscriber<in T>?) {
+ if (subscriber == null) throw NullPointerException()
+ val hasContext = subscriber.currentContext().isEmpty
+ val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
+ subscriber.onSubscribe(FlowSubscription(source, subscriber))
+ }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 316146b..18b84ac 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -74,4 +74,4 @@
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-}
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
index 5a4ccd0..942ba7b 100644
--- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -30,6 +30,18 @@
* .subscribe()
* }
* ```
+ *
+ * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
+ * is propagated into [mono] and [flux] Reactor builders:
+ * ```
+ * launch(Context.of("key", "value").asCoroutineContext()) {
+ * assertEquals(bar().awaitFirst(), "value")
+ * }
+ *
+ * fun bar(): Mono<String> = mono {
+ * coroutineContext[ReactorContext]!!.context.get("key")
+ * }
+ * ```
*/
@ExperimentalCoroutinesApi
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
new file mode 100644
index 0000000..68309bb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.coroutines.*
+
+internal class ReactorContextInjector : ContextInjector {
+ /**
+ * Injects all values from the [ReactorContext] entry of the given coroutine context
+ * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].
+ */
+ override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
+ val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher
+ return when(publisher) {
+ is Mono -> publisher.subscriberContext(reactorContext)
+ is Flux -> publisher.subscriberContext(reactorContext)
+ else -> publisher
+ }
+ }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
index 120cd72..80feaeb 100644
--- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
@@ -7,7 +7,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
import org.junit.Test
import reactor.core.publisher.*
import kotlin.test.*
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
new file mode 100644
index 0000000..2f8ce9a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -0,0 +1,27 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import reactor.core.publisher.Mono
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class FlowAsFluxTest {
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
+ val flux = flow<String> {
+ (1..4).forEach { i -> emit(m(i).awaitFirst()) }
+ } .asFlux()
+ .subscriberContext(Context.of(1, "1"))
+ .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
+ var i = 0
+ flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
+ }
+
+ private fun m(i: Int): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(i, "noValue")
+ }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
index 1fb4f0b..e9ac200 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -1,10 +1,13 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
-import reactor.util.context.Context
-import kotlin.test.assertEquals
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.test.*
class ReactorContextTest {
@Test
@@ -14,8 +17,8 @@
buildString {
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
}
- } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
- .subscriberContext { ctx -> ctx.put(6, "6") }
+ } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+ .subscriberContext { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
}
@@ -29,4 +32,80 @@
var i = 0
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
}
+
+ @Test
+ fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
+ val result = mono(Context.of(1, "1").asCoroutineContext()) {
+ val ctx = coroutineContext[ReactorContext]?.context
+ buildString {
+ (1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) }
+ }
+ } .subscriberContext(Context.of(2, "2"))
+ .awaitFirst()
+ assertEquals(result, "123")
+ }
+
+ @Test
+ fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
+ assertEquals(m().awaitFirst(), "7")
+ assertEquals(m().awaitFirstOrDefault("noValue"), "7")
+ assertEquals(m().awaitFirstOrNull(), "7")
+ assertEquals(m().awaitFirstOrElse { "noValue" }, "7")
+ assertEquals(m().awaitLast(), "7")
+ assertEquals(m().awaitSingle(), "7")
+ }
+
+ @Test
+ fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ assertEquals(f().awaitFirst(), "1")
+ assertEquals(f().awaitFirstOrDefault("noValue"), "1")
+ assertEquals(f().awaitFirstOrNull(), "1")
+ assertEquals(f().awaitFirstOrElse { "noValue" }, "1")
+ assertEquals(f().awaitLast(), "3")
+ var i = 0
+ f().subscribe { str -> i++; assertEquals(str, i.toString()) }
+ }
+
+ private fun m(): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(7, "noValue")
+ }
+
+
+ private fun f(): Flux<String?> = flux {
+ val ctx = coroutineContext[ReactorContext]?.context
+ (1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) }
+ }
+
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ var i = 0
+ // call "collect" on the converted Flow
+ bar().collect { str ->
+ i++; assertEquals(str, i.toString())
+ }
+ assertEquals(i, 3)
+ }
+
+ @Test
+ fun testFlowToFluxDirectContextPropagation() = runBlocking(
+ Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+ ) {
+ var i = 0
+ // convert resulting flow to channel using "produceIn"
+ val channel = bar().produceIn(this)
+ channel.consumeEach { str ->
+ i++; assertEquals(str, i.toString())
+ }
+ assertEquals(i, 3)
+ }
+
+ private fun bar(): Flow<String> = flux {
+ val ctx = coroutineContext[ReactorContext]!!.context
+ (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
+ }.asFlow()
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index d5678de..4b12127 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -8,8 +8,7 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.flow.*
-import org.reactivestreams.*
+import kotlinx.coroutines.reactive.*
import kotlin.coroutines.*
/**
@@ -82,7 +81,7 @@
/**
* Converts the given flow to a cold observable.
- * The original flow is cancelled if the observable subscriber was disposed.
+ * The original flow is cancelled when the observable subscriber is disposed.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
@@ -106,8 +105,8 @@
}
/**
- * Converts the given flow to a cold observable.
- * The original flow is cancelled if the flowable subscriber was disposed.
+ * Converts the given flow to a cold flowable.
+ * The original flow is cancelled when the flowable subscriber is disposed.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
index 1904334..ed0bc36 100644
--- a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
@@ -8,7 +8,6 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
import org.junit.Test
import kotlin.test.*