Coroutine context propagation for Reactor to coroutines API migration (#1377)
* Propagation of the coroutine context of await calls into
Mono/Flux builder
* Publisher.asFlow propagates coroutine context from `collect`
call to the Publisher
* Flow.asFlux transform
* Optimized FlowSubscription
* kotlinx.coroutines.reactor.flow package is replaced with kotlinx.coroutines.reactor
Fixes #284
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
new file mode 100644
index 0000000..2f8ce9a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -0,0 +1,27 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import reactor.core.publisher.Mono
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class FlowAsFluxTest {
+ @Test
+ fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
+ val flux = flow<String> {
+ (1..4).forEach { i -> emit(m(i).awaitFirst()) }
+ } .asFlux()
+ .subscriberContext(Context.of(1, "1"))
+ .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
+ var i = 0
+ flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
+ }
+
+ private fun m(i: Int): Mono<String> = mono {
+ val ctx = coroutineContext[ReactorContext]?.context
+ ctx?.getOrDefault(i, "noValue")
+ }
+}
\ No newline at end of file