Context passing between coroutines and Reactor Mono/Flux (#1138)


Fixes #284

diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 843c94c..a1364af 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -76,7 +76,8 @@
 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
 
 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
-private class PublisherCoroutine<in T>(
+@InternalCoroutinesApi
+public class PublisherCoroutine<in T>(
     parentContext: CoroutineContext,
     private val subscriber: Subscriber<T>
 ) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
diff --git a/reactive/kotlinx-coroutines-reactor/build.gradle b/reactive/kotlinx-coroutines-reactor/build.gradle
index 72ef6e5..c73716d 100644
--- a/reactive/kotlinx-coroutines-reactor/build.gradle
+++ b/reactive/kotlinx-coroutines-reactor/build.gradle
@@ -12,4 +12,12 @@
         url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
         packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
     }
+}
+
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+compileKotlin {
+    kotlinOptions.jvmTarget = "1.8"
 }
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 785b465..316146b 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,3 +1,4 @@
+
 /*
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
@@ -9,6 +10,8 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.reactive.*
+import org.reactivestreams.Publisher
+import reactor.core.CoreSubscriber
 import reactor.core.publisher.*
 import kotlin.coroutines.*
 import kotlin.internal.LowPriorityInOverloadResolution
@@ -41,8 +44,8 @@
     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
 ): Flux<T> {
     require(context[Job] === null) { "Flux context cannot contain job in it." +
-            "Its lifecycle should be managed via Disposable handle. Had $context" }
-    return Flux.from(publishInternal(GlobalScope, context, block))
+        "Its lifecycle should be managed via Disposable handle. Had $context" }
+    return Flux.from(reactorPublish(GlobalScope, context, block))
 }
 
 @Deprecated(
@@ -55,4 +58,20 @@
     context: CoroutineContext = EmptyCoroutineContext,
     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
 ): Flux<T> =
-    Flux.from(publishInternal(this, context, block))
+    Flux.from(reactorPublish(this, context, block))
+
+private fun <T> reactorPublish(
+    scope: CoroutineScope,
+    context: CoroutineContext = EmptyCoroutineContext,
+    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> = Publisher { subscriber ->
+    // specification requires NPE on null subscriber
+    if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
+    require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
+    val currentContext = subscriber.currentContext()
+    val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
+    val newContext = scope.newCoroutineContext(context + reactorContext)
+    val coroutine = PublisherCoroutine(newContext, subscriber)
+    subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index a0f65af..b218f6d 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -53,7 +53,8 @@
     context: CoroutineContext,
     block: suspend CoroutineScope.() -> T?
 ): Mono<T> = Mono.create { sink ->
-    val newContext = scope.newCoroutineContext(context)
+    val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
+    val newContext = scope.newCoroutineContext(context + reactorContext)
     val coroutine = MonoCoroutine(newContext, sink)
     sink.onDispose(coroutine)
     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
@@ -78,7 +79,7 @@
             handleCoroutineException(context, cause)
         }
     }
-    
+
     override fun dispose() {
         disposed = true
         cancel()
@@ -86,4 +87,3 @@
 
     override fun isDisposed(): Boolean = disposed
 }
-
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
new file mode 100644
index 0000000..5a4ccd0
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -0,0 +1,44 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import reactor.util.context.Context
+import kotlin.coroutines.*
+
+/**
+ * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
+ *
+ * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
+ *
+ * Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`.
+ *
+ * ### Usages
+ * Passing reactor context from coroutine builder to reactor entity:
+ * ```
+ * launch(Context.of("key", "value").asCoroutineContext()) {
+ *     mono {
+ *         println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
+ *     }.subscribe()
+ * }
+ * ```
+ *
+ * Accessing modified reactor context enriched from the downstream:
+ * ```
+ * launch {
+ *     mono {
+ *         println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
+ *     }.subscriberContext(Context.of("key", "value"))
+ *    .subscribe()
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
+    companion object Key : CoroutineContext.Key<ReactorContext>
+}
+
+/**
+ * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
+ * and later used via `coroutineContext[ReactorContext]`.
+ */
+@ExperimentalCoroutinesApi
+public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
new file mode 100644
index 0000000..1fb4f0b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -0,0 +1,32 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class ReactorContextTest {
+    @Test
+    fun testMonoHookedContext() = runBlocking {
+        val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
+            val ctx = coroutineContext[ReactorContext]?.context
+            buildString {
+                (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
+            }
+        }   .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+            .subscriberContext { ctx -> ctx.put(6, "6") }
+        assertEquals(mono.awaitFirst(), "1234567")
+    }
+
+    @Test
+    fun testFluxContext() = runBlocking<Unit> {
+        val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
+            val ctx = coroutineContext[ReactorContext]!!.context
+            (1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
+        }   .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+            .subscriberContext { ctx -> ctx.put(6, "6") }
+        var i = 0
+        flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
+    }
+}
\ No newline at end of file