Context passing between coroutines and Reactor Mono/Flux (#1138)
Fixes #284
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 843c94c..a1364af 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -76,7 +76,8 @@
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
-private class PublisherCoroutine<in T>(
+@InternalCoroutinesApi
+public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
diff --git a/reactive/kotlinx-coroutines-reactor/build.gradle b/reactive/kotlinx-coroutines-reactor/build.gradle
index 72ef6e5..c73716d 100644
--- a/reactive/kotlinx-coroutines-reactor/build.gradle
+++ b/reactive/kotlinx-coroutines-reactor/build.gradle
@@ -12,4 +12,12 @@
url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
+}
+
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+compileKotlin {
+ kotlinOptions.jvmTarget = "1.8"
}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 785b465..316146b 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,3 +1,4 @@
+
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@@ -9,6 +10,8 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
+import org.reactivestreams.Publisher
+import reactor.core.CoreSubscriber
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.LowPriorityInOverloadResolution
@@ -41,8 +44,8 @@
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> {
require(context[Job] === null) { "Flux context cannot contain job in it." +
- "Its lifecycle should be managed via Disposable handle. Had $context" }
- return Flux.from(publishInternal(GlobalScope, context, block))
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return Flux.from(reactorPublish(GlobalScope, context, block))
}
@Deprecated(
@@ -55,4 +58,20 @@
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> =
- Flux.from(publishInternal(this, context, block))
+ Flux.from(reactorPublish(this, context, block))
+
+private fun <T> reactorPublish(
+ scope: CoroutineScope,
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> = Publisher { subscriber ->
+ // specification requires NPE on null subscriber
+ if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
+ require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
+ val currentContext = subscriber.currentContext()
+ val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
+ val newContext = scope.newCoroutineContext(context + reactorContext)
+ val coroutine = PublisherCoroutine(newContext, subscriber)
+ subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index a0f65af..b218f6d 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -53,7 +53,8 @@
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
- val newContext = scope.newCoroutineContext(context)
+ val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
+ val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -78,7 +79,7 @@
handleCoroutineException(context, cause)
}
}
-
+
override fun dispose() {
disposed = true
cancel()
@@ -86,4 +87,3 @@
override fun isDisposed(): Boolean = disposed
}
-
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
new file mode 100644
index 0000000..5a4ccd0
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -0,0 +1,44 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import reactor.util.context.Context
+import kotlin.coroutines.*
+
+/**
+ * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
+ *
+ * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
+ *
+ * Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`.
+ *
+ * ### Usages
+ * Passing reactor context from coroutine builder to reactor entity:
+ * ```
+ * launch(Context.of("key", "value").asCoroutineContext()) {
+ * mono {
+ * println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
+ * }.subscribe()
+ * }
+ * ```
+ *
+ * Accessing modified reactor context enriched from the downstream:
+ * ```
+ * launch {
+ * mono {
+ * println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
+ * }.subscriberContext(Context.of("key", "value"))
+ * .subscribe()
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
+ companion object Key : CoroutineContext.Key<ReactorContext>
+}
+
+/**
+ * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
+ * and later used via `coroutineContext[ReactorContext]`.
+ */
+@ExperimentalCoroutinesApi
+public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
new file mode 100644
index 0000000..1fb4f0b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -0,0 +1,32 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class ReactorContextTest {
+ @Test
+ fun testMonoHookedContext() = runBlocking {
+ val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
+ val ctx = coroutineContext[ReactorContext]?.context
+ buildString {
+ (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
+ }
+ } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+ .subscriberContext { ctx -> ctx.put(6, "6") }
+ assertEquals(mono.awaitFirst(), "1234567")
+ }
+
+ @Test
+ fun testFluxContext() = runBlocking<Unit> {
+ val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
+ val ctx = coroutineContext[ReactorContext]!!.context
+ (1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
+ } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+ .subscriberContext { ctx -> ctx.put(6, "6") }
+ var i = 0
+ flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
+ }
+}
\ No newline at end of file