Coroutine context propagation for Reactor to coroutines API migration (#1377)

* Propagation of the coroutine context of await calls into
  Mono/Flux builder
* Publisher.asFlow propagates coroutine context from `collect`
  call to the Publisher
* Flow.asFlux transform
* Optimized FlowSubscription
* kotlinx.coroutines.reactor.flow package is replaced with kotlinx.coroutines.reactor

Fixes #284
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index e807cc6..b899a3b 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -977,6 +977,10 @@
 	public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
+public final class kotlinx/coroutines/intrinsics/CancellableKt {
+	public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
+}
+
 public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
 	public synthetic fun <init> (II)V
 	public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
index 643f641..fb24c87 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
@@ -14,11 +14,29 @@
 	public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
 }
 
+public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
+	public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
+}
+
 public final class kotlinx/coroutines/reactive/ConvertKt {
 	public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
 	public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
 }
 
+public final class kotlinx/coroutines/reactive/FlowKt {
+	public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
+	public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
+}
+
+public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
+	public final field flow Lkotlinx/coroutines/flow/Flow;
+	public final field subscriber Lorg/reactivestreams/Subscriber;
+	public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
+	public fun cancel ()V
+	public fun request (J)V
+}
+
 public final class kotlinx/coroutines/reactive/PublishKt {
 	public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
 	public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
@@ -44,12 +62,3 @@
 	public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
-public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
-	public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
-}
-
-public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
-	public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
-}
-
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
index 46b35ed..20e20ba 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt
@@ -5,6 +5,10 @@
 	public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
 }
 
+public final class kotlinx/coroutines/reactor/FlowKt {
+	public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
+}
+
 public final class kotlinx/coroutines/reactor/FluxKt {
 	public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
 	public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
index c442c95..246ae2c 100644
--- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
+++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
@@ -12,7 +12,8 @@
  * Use this function to start coroutine in a cancellable way, so that it can be cancelled
  * while waiting to be dispatched.
  */
-internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
+@InternalCoroutinesApi
+public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
     createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
 }
 
diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt
index d12a628..072773a 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Await.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt
@@ -10,6 +10,7 @@
 import org.reactivestreams.Publisher
 import org.reactivestreams.Subscriber
 import org.reactivestreams.Subscription
+import java.util.*
 import kotlin.coroutines.*
 
 /**
@@ -81,6 +82,16 @@
 
 // ------------------------ private ------------------------
 
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: Array<ContextInjector> =
+    ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+    contextInjectors.fold(this) { pub, contextInjector ->
+        contextInjector.injectCoroutineContext(pub, coroutineContext)
+    }
+
 private enum class Mode(val s: String) {
     FIRST("awaitFirst"),
     FIRST_OR_DEFAULT("awaitFirstOrDefault"),
@@ -93,7 +104,7 @@
     mode: Mode,
     default: T? = null
 ): T = suspendCancellableCoroutine { cont ->
-    subscribe(object : Subscriber<T> {
+    injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
         private lateinit var subscription: Subscription
         private var value: T? = null
         private var seenValue = false
diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
new file mode 100644
index 0000000..45f6553
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
@@ -0,0 +1,15 @@
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.InternalCoroutinesApi
+import org.reactivestreams.Publisher
+import kotlin.coroutines.CoroutineContext
+
+/** @suppress */
+@InternalCoroutinesApi
+public interface ContextInjector {
+    /**
+     * Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
+     * This API used as an indirection layer between `reactive` and `reactor` modules.
+     */
+    public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
new file mode 100644
index 0000000..387c8e7
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.reactivestreams.*
+import kotlinx.coroutines.intrinsics.*
+
+/**
+ * Transforms the given flow to a spec-compliant [Publisher].
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
+
+/**
+ * Adapter that transforms [Flow] into TCK-complaint [Publisher].
+ * [cancel] invocation cancels the original flow.
+ */
+@Suppress("PublisherImplementation")
+private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
+    override fun subscribe(subscriber: Subscriber<in T>?) {
+        if (subscriber == null) throw NullPointerException()
+        subscriber.onSubscribe(FlowSubscription(flow, subscriber))
+    }
+}
+
+/** @suppress */
+@InternalCoroutinesApi
+public class FlowSubscription<T>(
+    @JvmField val flow: Flow<T>,
+    @JvmField val subscriber: Subscriber<in T>
+) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
+    private val requested = atomic(0L)
+    private val producer = atomic<CancellableContinuation<Unit>?>(null)
+
+    override fun onStart() {
+        ::flowProcessing.startCoroutineCancellable(this)
+    }
+
+    private suspend fun flowProcessing() {
+        try {
+            consumeFlow()
+            subscriber.onComplete()
+        } catch (e: Throwable) {
+            try {
+                if (e is CancellationException) {
+                    subscriber.onComplete()
+                } else {
+                    subscriber.onError(e)
+                }
+            } catch (e: Throwable) {
+                // Last ditch report
+                handleCoroutineException(coroutineContext, e)
+            }
+        }
+    }
+
+    /*
+     * This method has at most one caller at any time (triggered from the `request` method)
+     */
+    private suspend fun consumeFlow() {
+        flow.collect { value ->
+            /*
+             * Flow is scopeless, thus if it's not active, its subscription was cancelled.
+             * No intermediate "child failed, but flow coroutine is not" states are allowed.
+             */
+            coroutineContext.ensureActive()
+            if (requested.value <= 0L) {
+                suspendCancellableCoroutine<Unit> {
+                    producer.value = it
+                    if (requested.value != 0L) it.resumeSafely()
+                }
+            }
+            requested.decrementAndGet()
+            subscriber.onNext(value)
+        }
+    }
+
+    override fun cancel() {
+        cancel(null)
+    }
+
+    override fun request(n: Long) {
+        if (n <= 0) {
+            return
+        }
+        start()
+        requested.update { value ->
+            val newValue = value + n
+            if (newValue <= 0L) Long.MAX_VALUE else newValue
+        }
+        val producer = producer.getAndSet(null) ?: return
+        producer.resumeSafely()
+    }
+
+    private fun CancellableContinuation<Unit>.resumeSafely() {
+        val token = tryResume(Unit)
+        if (token != null) {
+            completeResume(token)
+        }
+    }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
similarity index 83%
rename from reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
rename to reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
index 50338de..8da106e 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt
@@ -2,14 +2,17 @@
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-package kotlinx.coroutines.reactive.flow
+@file:JvmMultifileClass
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactive
 
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.flow.internal.*
-import kotlinx.coroutines.reactive.*
 import org.reactivestreams.*
+import java.util.*
 import kotlin.coroutines.*
 
 /**
@@ -21,13 +24,11 @@
  * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
  * are discarded.
  */
-@JvmName("from")
 @ExperimentalCoroutinesApi
 public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
     PublisherAsFlow(this, 1)
 
 @FlowPreview
-@JvmName("from")
 @Deprecated(
     message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
     level = DeprecationLevel.ERROR,
@@ -46,7 +47,9 @@
         // use another channel for conflation (cannot do openSubscription)
         if (capacity < 0) return super.produceImpl(scope)
         // Open subscription channel directly
-        val channel = publisher.openSubscription(capacity)
+        val channel = publisher
+            .injectCoroutineContext(scope.coroutineContext)
+            .openSubscription(capacity)
         val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
             channel.cancel(cause?.let {
                 it as? CancellationException ?: CancellationException("Job was cancelled", it)
@@ -70,7 +73,7 @@
 
     override suspend fun collect(collector: FlowCollector<T>) {
         val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
-        publisher.subscribe(subscriber)
+        publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
         try {
             var consumed = 0L
             while (true) {
@@ -127,3 +130,11 @@
         subscription.cancel()
     }
 }
+
+// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
+// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
+private val contextInjectors: List<ContextInjector> =
+    ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()
+
+private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
+    contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
deleted file mode 100644
index 05f2391..0000000
--- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.reactive.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
-import org.reactivestreams.*
-import java.util.concurrent.atomic.*
-import kotlin.coroutines.*
-
-/**
- * Transforms the given flow to a spec-compliant [Publisher].
- */
-@JvmName("from")
-@ExperimentalCoroutinesApi
-public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
-
-/**
- * Adapter that transforms [Flow] into TCK-complaint [Publisher].
- * [cancel] invocation cancels the original flow.
- */
-@Suppress("PublisherImplementation")
-private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
-
-    override fun subscribe(subscriber: Subscriber<in T>?) {
-        if (subscriber == null) throw NullPointerException()
-        subscriber.onSubscribe(FlowSubscription(flow, subscriber))
-    }
-
-    private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
-        @Volatile
-        internal var canceled: Boolean = false
-        private val requested = AtomicLong(0L)
-        private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
-
-        // This is actually optimizable
-        private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
-            try {
-                consumeFlow()
-                subscriber.onComplete()
-            } catch (e: Throwable) {
-                // Failed with real exception, not due to cancellation
-                if (!coroutineContext[Job]!!.isCancelled) {
-                    subscriber.onError(e)
-                }
-            }
-        }
-
-        private suspend fun consumeFlow() {
-            flow.collect { value ->
-                if (!coroutineContext.isActive) {
-                    subscriber.onComplete()
-                    coroutineContext.ensureActive()
-                }
-
-                if (requested.get() == 0L) {
-                    suspendCancellableCoroutine<Unit> {
-                        producer.set(it)
-                        if (requested.get() != 0L) it.resumeSafely()
-                    }
-                }
-
-                requested.decrementAndGet()
-                subscriber.onNext(value)
-            }
-        }
-
-        override fun cancel() {
-            canceled = true
-            job.cancel()
-        }
-
-        override fun request(n: Long) {
-            if (n <= 0) {
-                return
-            }
-
-            if (canceled) return
-
-            job.start()
-            var snapshot: Long
-            var newValue: Long
-            do {
-                snapshot = requested.get()
-                newValue = snapshot + n
-                if (newValue <= 0L) newValue = Long.MAX_VALUE
-            } while (!requested.compareAndSet(snapshot, newValue))
-
-            val prev = producer.get()
-            if (prev == null || !producer.compareAndSet(prev, null)) return
-            prev.resumeSafely()
-        }
-
-        private fun CancellableContinuation<Unit>.resumeSafely() {
-            val token = tryResume(Unit)
-            if (token != null) {
-                completeResume(token)
-            }
-        }
-    }
-}
diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
new file mode 100644
index 0000000..8633492
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import org.junit.Test
+import org.reactivestreams.*
+import kotlin.test.*
+
+class FlowAsPublisherTest : TestBase() {
+
+    @Test
+    fun testErrorOnCancellationIsReported() {
+        expect(1)
+        flow<Int> {
+            emit(2)
+            try {
+                hang { expect(3) }
+            } finally {
+                throw TestException()
+            }
+        }.asPublisher().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onComplete() {
+                expectUnreached()
+            }
+
+            override fun onSubscribe(s: Subscription?) {
+                subscription = s!!
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                expect(t)
+                subscription.cancel()
+            }
+
+            override fun onError(t: Throwable?) {
+                assertTrue(t is TestException)
+                expect(4)
+            }
+        })
+        finish(5)
+    }
+
+    @Test
+    fun testCancellationIsNotReported() {
+        expect(1)
+        flow<Int>    {
+            emit(2)
+            hang { expect(3) }
+        }.asPublisher().subscribe(object : Subscriber<Int> {
+            private lateinit var subscription: Subscription
+
+            override fun onComplete() {
+                expect(4)
+            }
+
+            override fun onSubscribe(s: Subscription?) {
+                subscription = s!!
+                subscription.request(2)
+            }
+
+            override fun onNext(t: Int) {
+                expect(t)
+                subscription.cancel()
+            }
+
+            override fun onError(t: Throwable?) {
+                expectUnreached()
+            }
+        })
+        finish(5)
+    }
+}
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
similarity index 98%
rename from reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
index 31c5a3c..5dfd9d5 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt
@@ -4,7 +4,7 @@
 
 @file:Suppress("UNCHECKED_CAST")
 
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
 
 import kotlinx.coroutines.flow.*
 import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
similarity index 97%
rename from reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index 3f33b33..a37719d 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -2,12 +2,11 @@
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
 
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.*
 import kotlin.test.*
 
 class PublisherAsFlowTest : TestBase() {
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
similarity index 95%
rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
index 2ff96eb..b710c59 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt
@@ -2,7 +2,7 @@
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
 
 import kotlinx.coroutines.flow.*
 import org.junit.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
similarity index 96%
rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
index 1b37ee9..72d5de5 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt
@@ -2,7 +2,7 @@
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
 
 import org.junit.*
 import org.reactivestreams.*
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
similarity index 97%
rename from reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt
rename to reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
index 9e61100..63d444c 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt
@@ -2,7 +2,7 @@
  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
-package kotlinx.coroutines.reactive.flow
+package kotlinx.coroutines.reactive
 
 import org.junit.*
 import org.reactivestreams.example.unicast.AsyncIterablePublisher
diff --git a/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
new file mode 100644
index 0000000..0097ec3
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector
@@ -0,0 +1 @@
+kotlinx.coroutines.reactor.ReactorContextInjector
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
new file mode 100644
index 0000000..7c6182b
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt
@@ -0,0 +1,26 @@
+@file:JvmName("FlowKt")
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.flowOn
+import kotlinx.coroutines.reactive.FlowSubscription
+import reactor.core.CoreSubscriber
+import reactor.core.publisher.Flux
+
+/**
+ * Converts the given flow to a cold flux.
+ * The original flow is cancelled when the flux subscriber is disposed.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
+
+private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
+    override fun subscribe(subscriber: CoreSubscriber<in T>?) {
+        if (subscriber == null) throw NullPointerException()
+        val hasContext = subscriber.currentContext().isEmpty
+        val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
+        subscriber.onSubscribe(FlowSubscription(source, subscriber))
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 316146b..18b84ac 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -74,4 +74,4 @@
     val coroutine = PublisherCoroutine(newContext, subscriber)
     subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-}
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
index 5a4ccd0..942ba7b 100644
--- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -30,6 +30,18 @@
  *    .subscribe()
  * }
  * ```
+ *
+ * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance
+ * is propagated into [mono] and [flux] Reactor builders:
+ * ```
+ * launch(Context.of("key", "value").asCoroutineContext()) {
+ *   assertEquals(bar().awaitFirst(), "value")
+ * }
+ *
+ * fun bar(): Mono<String> = mono {
+ *   coroutineContext[ReactorContext]!!.context.get("key")
+ * }
+ * ```
  */
 @ExperimentalCoroutinesApi
 public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
new file mode 100644
index 0000000..68309bb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
@@ -0,0 +1,22 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.reactive.*
+import org.reactivestreams.*
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.coroutines.*
+
+internal class ReactorContextInjector : ContextInjector {
+    /**
+     * Injects all values from the [ReactorContext] entry of the given coroutine context
+     * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux].
+     */
+    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
+        val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher
+        return when(publisher) {
+            is Mono -> publisher.subscriberContext(reactorContext)
+            is Flux -> publisher.subscriberContext(reactorContext)
+            else -> publisher
+        }
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
index 120cd72..80feaeb 100644
--- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
@@ -7,7 +7,6 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
 import org.junit.Test
 import reactor.core.publisher.*
 import kotlin.test.*
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
new file mode 100644
index 0000000..2f8ce9a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -0,0 +1,27 @@
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import reactor.core.publisher.Mono
+import reactor.util.context.Context
+import kotlin.test.assertEquals
+
+class FlowAsFluxTest {
+    @Test
+    fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
+        val flux = flow<String> {
+            (1..4).forEach { i -> emit(m(i).awaitFirst()) }
+        }   .asFlux()
+            .subscriberContext(Context.of(1, "1"))
+            .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
+        var i = 0
+        flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
+    }
+
+    private fun m(i: Int): Mono<String> = mono {
+        val ctx = coroutineContext[ReactorContext]?.context
+        ctx?.getOrDefault(i, "noValue")
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
index 1fb4f0b..e9ac200 100644
--- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -1,10 +1,13 @@
 package kotlinx.coroutines.reactor
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
 import org.junit.Test
-import reactor.util.context.Context
-import kotlin.test.assertEquals
+import reactor.core.publisher.*
+import reactor.util.context.*
+import kotlin.test.*
 
 class ReactorContextTest {
     @Test
@@ -14,8 +17,8 @@
             buildString {
                 (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
             }
-        }   .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
-            .subscriberContext { ctx -> ctx.put(6, "6") }
+        }  .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
+           .subscriberContext { ctx -> ctx.put(6, "6") }
         assertEquals(mono.awaitFirst(), "1234567")
     }
 
@@ -29,4 +32,80 @@
         var i = 0
         flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
     }
+
+    @Test
+    fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) {
+        val result = mono(Context.of(1, "1").asCoroutineContext()) {
+            val ctx = coroutineContext[ReactorContext]?.context
+            buildString {
+                (1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) }
+            }
+        }  .subscriberContext(Context.of(2, "2"))
+            .awaitFirst()
+        assertEquals(result, "123")
+    }
+
+    @Test
+    fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) {
+        assertEquals(m().awaitFirst(), "7")
+        assertEquals(m().awaitFirstOrDefault("noValue"), "7")
+        assertEquals(m().awaitFirstOrNull(), "7")
+        assertEquals(m().awaitFirstOrElse { "noValue" }, "7")
+        assertEquals(m().awaitLast(), "7")
+        assertEquals(m().awaitSingle(), "7")
+    }
+
+    @Test
+    fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
+        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+    ) {
+        assertEquals(f().awaitFirst(), "1")
+        assertEquals(f().awaitFirstOrDefault("noValue"), "1")
+        assertEquals(f().awaitFirstOrNull(), "1")
+        assertEquals(f().awaitFirstOrElse { "noValue" }, "1")
+        assertEquals(f().awaitLast(), "3")
+        var i = 0
+        f().subscribe { str -> i++; assertEquals(str, i.toString()) }
+    }
+
+    private fun m(): Mono<String> = mono {
+        val ctx = coroutineContext[ReactorContext]?.context
+        ctx?.getOrDefault(7, "noValue")
+    }
+
+
+    private fun f(): Flux<String?> = flux {
+        val ctx = coroutineContext[ReactorContext]?.context
+        (1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) }
+    }
+
+    @Test
+    fun testFlowToFluxContextPropagation() = runBlocking(
+        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+    ) {
+        var i = 0
+        // call "collect" on the converted Flow
+        bar().collect { str ->
+            i++; assertEquals(str, i.toString())
+        }
+        assertEquals(i, 3)
+    }
+
+    @Test
+    fun testFlowToFluxDirectContextPropagation() = runBlocking(
+        Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
+    ) {
+        var i = 0
+        // convert resulting flow to channel using "produceIn"
+        val channel = bar().produceIn(this)
+        channel.consumeEach { str ->
+            i++; assertEquals(str, i.toString())
+        }
+        assertEquals(i, 3)
+    }
+
+    private fun bar(): Flow<String> = flux {
+        val ctx = coroutineContext[ReactorContext]!!.context
+        (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) }
+    }.asFlow()
 }
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index d5678de..4b12127 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -8,8 +8,7 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
-import kotlinx.coroutines.reactive.flow.*
-import org.reactivestreams.*
+import kotlinx.coroutines.reactive.*
 import kotlin.coroutines.*
 
 /**
@@ -82,7 +81,7 @@
 
 /**
  * Converts the given flow to a cold observable.
- * The original flow is cancelled if the observable subscriber was disposed.
+ * The original flow is cancelled when the observable subscriber is disposed.
  */
 @JvmName("from")
 @ExperimentalCoroutinesApi
@@ -106,8 +105,8 @@
 }
 
 /**
- * Converts the given flow to a cold observable.
- * The original flow is cancelled if the flowable subscriber was disposed.
+ * Converts the given flow to a cold flowable.
+ * The original flow is cancelled when the flowable subscriber is disposed.
  */
 @JvmName("from")
 @ExperimentalCoroutinesApi
diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
index 1904334..ed0bc36 100644
--- a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
@@ -8,7 +8,6 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
-import kotlinx.coroutines.reactive.flow.*
 import org.junit.Test
 import kotlin.test.*