Flow scope (#1227)

* Introducing flowScope, builder necessary for creating cancellation-transparent flow operators
* Incorporate flow scope into flow operators

Fixes #1218
Fixes #1128
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 8ff82c5..ab51a2d 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -381,7 +381,6 @@
 	public fun fold (Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
 	public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
 	public final fun getCancellationException ()Ljava/util/concurrent/CancellationException;
-	protected fun getCancelsParent ()Z
 	public fun getChildJobCancellationCause ()Ljava/util/concurrent/CancellationException;
 	public final fun getChildren ()Lkotlin/sequences/Sequence;
 	protected final fun getCompletionCause ()Ljava/lang/Throwable;
@@ -396,6 +395,7 @@
 	public final fun isCancelled ()Z
 	public final fun isCompleted ()Z
 	public final fun isCompletedExceptionally ()Z
+	protected fun isScopedCoroutine ()Z
 	public final fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
 	protected fun onCancelling (Ljava/lang/Throwable;)V
diff --git a/kotlinx-coroutines-core/common/src/Exceptions.common.kt b/kotlinx-coroutines-core/common/src/Exceptions.common.kt
index e8c2f5e..62b6ba4 100644
--- a/kotlinx-coroutines-core/common/src/Exceptions.common.kt
+++ b/kotlinx-coroutines-core/common/src/Exceptions.common.kt
@@ -23,7 +23,7 @@
     internal val job: Job
 }
 
-internal expect class CoroutinesInternalError(message: String, cause: Throwable) : Error
+internal class CoroutinesInternalError(message: String, cause: Throwable) : Error(message, cause)
 
 internal expect fun Throwable.addSuppressedThrowable(other: Throwable)
 // For use in tests
diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt
index 4963c37..d8b6b92 100644
--- a/kotlinx-coroutines-core/common/src/JobSupport.kt
+++ b/kotlinx-coroutines-core/common/src/JobSupport.kt
@@ -319,6 +319,31 @@
         cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
     }
 
+    /**
+     * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
+     * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
+     *
+     * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
+     * may leak to the [CoroutineExceptionHandler].
+     */
+    private fun cancelParent(cause: Throwable): Boolean {
+        /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
+         * This allow parent to cancel its children (normally) without being cancelled itself, unless
+         * child crashes and produce some other exception during its completion.
+         */
+        val isCancellation = cause is CancellationException
+        val parent = parentHandle
+        // No parent -- ignore CE, report other exceptions.
+        if (parent === null || parent === NonDisposableHandle) {
+            return isCancellation
+        }
+
+        // Is scoped coroutine -- don't propagate, will be rethrown
+        if (isScopedCoroutine) return isCancellation
+        // Notify parent but don't forget to check cancellation
+        return parent.childCancelled(cause) || isCancellation
+    }
+
     private fun NodeList.notifyCompletion(cause: Throwable?) =
         notifyHandlers<JobNode<*>>(this, cause)
 
@@ -594,21 +619,29 @@
         cancelImpl(parentJob)
     }
 
-    // Child was cancelled with cause
-    // It is overridden in supervisor implementations to ignore child cancellation
-    public open fun childCancelled(cause: Throwable): Boolean =
-        cancelImpl(cause) && handlesException
+    /**
+     * Child was cancelled with a cause.
+     * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
+     * It is overridden in supervisor implementations to completely ignore any child cancellation.
+     * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
+     *
+     * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
+     * may leak to the [CoroutineExceptionHandler].
+     */
+    public open fun childCancelled(cause: Throwable): Boolean {
+        if (cause is CancellationException) return true
+        return cancelImpl(cause) && handlesException
+    }
 
     /**
      * Makes this [Job] cancelled with a specified [cause].
      * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
      */
-    public fun cancelCoroutine(cause: Throwable?) =
-        cancelImpl(cause)
+    public fun cancelCoroutine(cause: Throwable?) = cancelImpl(cause)
 
     // cause is Throwable or ParentJob when cancelChild was invoked
     // returns true is exception was handled, false otherwise
-    private fun cancelImpl(cause: Any?): Boolean {
+    internal fun cancelImpl(cause: Any?): Boolean {
         if (onCancelComplete) {
             // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
             // completing and had recorded exception
@@ -912,14 +945,12 @@
     protected open fun onCancelling(cause: Throwable?) {}
 
     /**
-     * When this function returns `true` the parent is cancelled on cancellation of this job.
-     * Note that [CancellationException] is considered "normal" and parent is not cancelled when child produces it.
-     * This allows parent to cancel its children (normally) without being cancelled itself, unless
-     * child crashes and produce some other exception during its completion.
-     *
-     * @suppress **This is unstable API and it is subject to change.*
+     * Returns `true` for scoped coroutines.
+     * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
+     * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
+     * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
      */
-    protected open val cancelsParent: Boolean get() = true
+    protected open val isScopedCoroutine: Boolean get() = false
 
     /**
      * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
@@ -939,20 +970,9 @@
      *
      * This method is invoked **exactly once** when the final exception of the job is determined
      * and before it becomes complete. At the moment of invocation the job and all its children are complete.
-     *
-     * @suppress **This is unstable API and it is subject to change.*
      */
     protected open fun handleJobException(exception: Throwable): Boolean = false
 
-    private fun cancelParent(cause: Throwable): Boolean {
-        // CancellationException is considered "normal" and parent is not cancelled when child produces it.
-        // This allow parent to cancel its children (normally) without being cancelled itself, unless
-        // child crashes and produce some other exception during its completion.
-        if (cause is CancellationException) return true
-        if (!cancelsParent) return false
-        return parentHandle?.childCancelled(cause) == true
-    }
-
     /**
      * Override for completion actions that need to update some external object depending on job's state,
      * right before all the waiters for coroutine's completion are notified.
diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt
index 3c902db..8bfaf33 100644
--- a/kotlinx-coroutines-core/common/src/Timeout.kt
+++ b/kotlinx-coroutines-core/common/src/Timeout.kt
@@ -85,9 +85,7 @@
     override val defaultResumeMode: Int get() = MODE_DIRECT
     override val callerFrame: CoroutineStackFrame? get() = (uCont as? CoroutineStackFrame)
     override fun getStackTraceElement(): StackTraceElement? = null
-
-    override val cancelsParent: Boolean
-        get() = false // it throws exception to parent instead of cancelling it
+    override val isScopedCoroutine: Boolean get() = true
 
     @Suppress("LeakingThis", "Deprecation")
     override fun run() {
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index d7e01ab..9e34773 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -126,7 +126,7 @@
     return coroutine
 }
 
-private class ProducerCoroutine<E>(
+internal open class ProducerCoroutine<E>(
     parentContext: CoroutineContext, channel: Channel<E>
 ) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
     override val isActive: Boolean
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 6147b65..b4ff26d 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -313,7 +313,7 @@
 public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
     channelFlow(block)
 
-// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow 
+// ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
 private class ChannelFlowBuilder<T>(
     private val block: suspend ProducerScope<T>.() -> Unit,
     context: CoroutineContext = EmptyCoroutineContext,
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 77beb37..bf20d2f 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -118,7 +118,6 @@
 @Deprecated(message = "withContext in flow body is deprecated, use flowOn instead", level = DeprecationLevel.ERROR)
 public fun <T, R> FlowCollector<T>.withContext(context: CoroutineContext, block: suspend () -> R): Unit = error("Should not be called")
 
-
 /**
  * `subscribe` is Rx-specific API that has no direct match in flows.
  * One can use `launch` instead, for example the following:
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 30005af..57a0132 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -63,10 +63,10 @@
         scope.broadcast(context, produceCapacity, start, block = collectToFun)
 
     fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
-        scope.produce(context, produceCapacity, block = collectToFun)
+        scope.flowProduce(context, produceCapacity, block = collectToFun)
 
     override suspend fun collect(collector: FlowCollector<T>) =
-        coroutineScope { // todo: flowScope
+        coroutineScope {
             val channel = produceImpl(this)
             channel.consumeEach { collector.emit(it) }
         }
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
new file mode 100644
index 0000000..98f5cec
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.intrinsics.*
+import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
+import kotlinx.coroutines.flow.unsafeFlow as flow
+
+/**
+ * Creates a [CoroutineScope] and calls the specified suspend block with this scope.
+ * This builder is similar to [coroutineScope] with the only exception that it *ties* lifecycle of children
+ * and itself regarding the cancellation, thus being cancelled when one of the children becomes cancelled.
+ *
+ * For example:
+ * ```
+ * flowScope {
+ *     launch {
+ *         throw CancellationException()
+ *     }
+ * } // <- CE will be rethrown here
+ * ```
+ */
+internal suspend fun <R> flowScope(@BuilderInference block: suspend CoroutineScope.() -> R): R =
+    suspendCoroutineUninterceptedOrReturn { uCont ->
+        val coroutine = FlowCoroutine(uCont.context, uCont)
+        coroutine.startUndispatchedOrReturn(coroutine, block)
+    }
+
+/**
+ * Creates a flow that also provides a [CoroutineScope] for each collector
+ * Shorthand for:
+ * ```
+ * flow {
+ *     flowScope {
+ *         ...
+ *     }
+ * }
+ * ```
+ * with additional constraint on cancellation.
+ * To cancel child without cancelling itself, `cancel(ChildCancelledException())` should be used.
+ */
+internal fun <R> scopedFlow(@BuilderInference block: suspend CoroutineScope.(FlowCollector<R>) -> Unit): Flow<R> =
+    flow {
+        val collector = this
+        flowScope { block(collector) }
+    }
+
+/*
+ * Shortcut for produce { flowScope {block() } }
+ */
+internal fun <T> CoroutineScope.flowProduce(
+    context: CoroutineContext,
+    capacity: Int = 0, @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): ReceiveChannel<T> {
+    val channel = Channel<T>(capacity)
+    val newContext = newCoroutineContext(context)
+    val coroutine = FlowProduceCoroutine(newContext, channel)
+    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+    return coroutine
+}
+
+private class FlowCoroutine<T>(
+    context: CoroutineContext,
+    uCont: Continuation<T>
+) : ScopeCoroutine<T>(context, uCont) {
+
+    public override fun childCancelled(cause: Throwable): Boolean {
+        if (cause is ChildCancelledException) return true
+        return cancelImpl(cause)
+    }
+}
+
+private class FlowProduceCoroutine<T>(
+    parentContext: CoroutineContext,
+    channel: Channel<T>
+) : ProducerCoroutine<T>(parentContext, channel) {
+
+    public override fun childCancelled(cause: Throwable): Boolean {
+        if (cause is ChildCancelledException) return true
+        return cancelImpl(cause)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
similarity index 71%
rename from kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
rename to kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
index 6d5a4b4..6c675b3 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.common.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt
@@ -11,3 +11,8 @@
  * This exception should never escape outside of operator's implementation.
  */
 internal expect class AbortFlowException() : CancellationException
+
+/**
+ * Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
+ */
+internal expect class ChildCancelledException() : CancellationException
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 32e9b3f..4db3044 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -60,34 +60,33 @@
  */
 public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
     require(timeoutMillis > 0) { "Debounce timeout should be positive" }
-    return flow {
-        coroutineScope {
-            val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
-            // Channel is not closed deliberately as there is no close with value
-            val collector = async {
-                collect { value -> values.send(value ?: NULL) }
-            }
+    return scopedFlow { downstream ->
+        val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
+        // Channel is not closed deliberately as there is no close with value
+        val collector = async {
+            collect { value -> values.send(value ?: NULL) }
+        }
 
-            var isDone = false
-            var lastValue: Any? = null
-            while (!isDone) {
-                select<Unit> {
-                    values.onReceive {
-                        lastValue = it
-                    }
+        var isDone = false
+        var lastValue: Any? = null
+        while (!isDone) {
+            select<Unit> {
+                values.onReceive {
+                    lastValue = it
+                }
 
-                    lastValue?.let { value -> // set timeout when lastValue != null
-                        onTimeout(timeoutMillis) {
-                            lastValue = null // Consume the value
-                            emit(NULL.unbox(value))
-                        }
+                lastValue?.let { value ->
+                    // set timeout when lastValue != null
+                    onTimeout(timeoutMillis) {
+                        lastValue = null // Consume the value
+                        downstream.emit(NULL.unbox(value))
                     }
+                }
 
-                    // Close with value 'idiom'
-                    collector.onAwait {
-                        if (lastValue != null) emit(NULL.unbox(lastValue))
-                        isDone = true
-                    }
+                // Close with value 'idiom'
+                collector.onAwait {
+                    if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
+                    isDone = true
                 }
             }
         }
@@ -112,32 +111,31 @@
  */
 public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
     require(periodMillis > 0) { "Sample period should be positive" }
-    return flow {
-        coroutineScope {
-            val values = produce<Any?>(capacity = Channel.CONFLATED) {  // Actually Any, KT-30796
-                collect { value -> send(value ?: NULL) }
-            }
+    return scopedFlow { downstream ->
+        val values = produce<Any?>(capacity = Channel.CONFLATED) {
+            // Actually Any, KT-30796
+            collect { value -> send(value ?: NULL) }
+        }
 
-            var isDone = false
-            var lastValue: Any? = null
-            val ticker = fixedPeriodTicker(periodMillis)
-            while (!isDone) {
-                select<Unit> {
-                    values.onReceiveOrNull {
-                        if (it == null) {
-                            ticker.cancel()
-                            isDone = true
-                        } else {
-                            lastValue = it
-                        }
+        var isDone = false
+        var lastValue: Any? = null
+        val ticker = fixedPeriodTicker(periodMillis)
+        while (!isDone) {
+            select<Unit> {
+                values.onReceiveOrNull {
+                    if (it == null) {
+                        ticker.cancel(ChildCancelledException())
+                        isDone = true
+                    } else {
+                        lastValue = it
                     }
+                }
 
-                    // todo: shall be start sampling only when an element arrives or sample aways as here?
-                    ticker.onReceive {
-                        val value = lastValue ?: return@onReceive
-                        lastValue = null // Consume the value
-                        emit(NULL.unbox(value))
-                    }
+                // todo: shall be start sampling only when an element arrives or sample aways as here?
+                ticker.onReceive {
+                    val value = lastValue ?: return@onReceive
+                    lastValue = null // Consume the value
+                    downstream.emit(NULL.unbox(value))
                 }
             }
         }
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index f7a6447..0fa6e8a 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -129,17 +129,16 @@
  * produces `aa bb b_last`
  */
 @FlowPreview
-public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
-    coroutineScope {
-        var previousFlow: Job? = null
-        collect { value ->
-            // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
-            previousFlow?.cancelAndJoin()
-            // Undispatched to have better user experience in case of synchronous flows
-            previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
-                transform(value).collect { innerValue ->
-                    emit(innerValue)
-                }
+public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
+    var previousFlow: Job? = null
+    collect { value ->
+        // Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
+        previousFlow?.cancel(ChildCancelledException())
+        previousFlow?.join()
+        // Undispatched to have better user experience in case of synchronous flows
+        previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
+            transform(value).collect { innerValue ->
+                downstream.emit(innerValue)
             }
         }
     }
@@ -175,7 +174,7 @@
     override suspend fun flowCollect(collector: FlowCollector<T>) {
         // this function should not have been invoked when channel was explicitly requested
         check(capacity == OPTIONAL_CHANNEL)
-        coroutineScope { // todo: flowScope
+        flowScope {
             mergeImpl(this, collector.asConcurrentFlowCollector())
         }
     }
diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt
index 3361694..9197ec8 100644
--- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt
+++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt
@@ -17,13 +17,12 @@
 ) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
     final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
     final override fun getStackTraceElement(): StackTraceElement? = null
+    final override val isScopedCoroutine: Boolean get() = true
+
     override val defaultResumeMode: Int get() = MODE_DIRECT
 
     internal val parent: Job? get() = parentContext[Job]
 
-    override val cancelsParent: Boolean
-        get() = false // it throws exception to parent instead of cancelling it
-
     @Suppress("UNCHECKED_CAST")
     override fun afterCompletionInternal(state: Any?, mode: Int) {
         if (state is CompletedExceptionally) {
diff --git a/kotlinx-coroutines-core/common/test/SupervisorTest.kt b/kotlinx-coroutines-core/common/test/SupervisorTest.kt
index fae7091..535073e 100644
--- a/kotlinx-coroutines-core/common/test/SupervisorTest.kt
+++ b/kotlinx-coroutines-core/common/test/SupervisorTest.kt
@@ -219,4 +219,22 @@
         yield() // to coroutineScope
         finish(7)
     }
+
+    @Test
+    fun testSupervisorJobCancellationException() = runTest {
+        val job = SupervisorJob()
+        val child = launch(job + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
+            expect(1)
+            hang {
+                expect(3)
+            }
+        }
+
+        yield()
+        expect(2)
+        child.cancelAndJoin()
+        job.complete()
+        job.join()
+        finish(4)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index 0ae30e8..a77f8fa 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -81,4 +81,63 @@
         assertFailsWith<TestException>(flow)
         finish(4)
     }
+
+    @Test
+    fun testMergeOneCoroutineWithCancellation() = runTest {
+        val flow = flowOf(1, 2, 3)
+        val f = flow.mergeOneCoroutine(flow).take(2)
+        assertEquals(listOf(1, 1), f.toList())
+    }
+
+    @Test
+    fun testMergeTwoCoroutinesWithCancellation() = runTest {
+        val flow = flowOf(1, 2, 3)
+        val f = flow.mergeTwoCoroutines(flow).take(2)
+        assertEquals(listOf(1, 1), f.toList())
+    }
+
+    private fun Flow<Int>.mergeTwoCoroutines(other: Flow<Int>): Flow<Int> = channelFlow {
+        launch {
+            collect { send(it); yield() }
+        }
+        launch {
+            other.collect { send(it) }
+        }
+    }
+
+    private fun Flow<Int>.mergeOneCoroutine(other: Flow<Int>): Flow<Int> = channelFlow {
+        launch {
+            collect { send(it); yield() }
+        }
+
+        other.collect { send(it); yield() }
+    }
+
+    @Test
+    fun testBufferWithTimeout() = runTest {
+        fun Flow<Int>.bufferWithTimeout(): Flow<Int> = channelFlow {
+            expect(2)
+            launch {
+                expect(3)
+                hang {
+                    expect(5)
+                }
+            }
+            launch {
+                expect(4)
+                collect {
+                    withTimeout(-1) {
+                        send(it)
+                    }
+                    expectUnreached()
+                }
+                expectUnreached()
+            }
+        }
+
+        val flow = flowOf(1, 2, 3).bufferWithTimeout()
+        expect(1)
+        assertFailsWith<TimeoutCancellationException>(flow)
+        finish(6)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
index d992d06..a6b5340 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
@@ -45,4 +45,3 @@
         finish(3)
     }
 }
-
diff --git a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
new file mode 100644
index 0000000..d41ab88
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class FlowScopeTest : TestBase() {
+
+    @Test
+    fun testCancellation() = runTest {
+        assertFailsWith<CancellationException> {
+            flowScope {
+                expect(1)
+                val child = launch {
+                    expect(3)
+                    hang { expect(5) }
+                }
+                expect(2)
+                yield()
+                expect(4)
+                child.cancel()
+            }
+        }
+        finish(6)
+    }
+
+    @Test
+    fun testCancellationWithChildCancelled() = runTest {
+        flowScope {
+            expect(1)
+            val child = launch {
+                expect(3)
+                hang { expect(5) }
+            }
+            expect(2)
+            yield()
+            expect(4)
+            child.cancel(ChildCancelledException())
+        }
+        finish(6)
+    }
+
+    @Test
+    fun testCancellationWithSuspensionPoint() = runTest {
+        assertFailsWith<CancellationException> {
+            flowScope {
+                expect(1)
+                val child = launch {
+                    expect(3)
+                    hang { expect(6) }
+                }
+                expect(2)
+                yield()
+                expect(4)
+                child.cancel()
+                hang { expect(5) }
+            }
+        }
+        finish(7)
+    }
+
+    @Test
+    fun testNestedScopes() = runTest {
+        assertFailsWith<CancellationException> {
+            flowScope {
+                flowScope {
+                    launch {
+                       throw CancellationException(null)
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
index bda9927..54244f0 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt
@@ -197,6 +197,46 @@
         assertFailsWith<TestException>(flow)
         finish(2)
     }
+
+    @Test
+    fun testCancellationExceptionUpstream() = runTest {
+        val f1 = flow {
+            expect(1)
+            emit(1)
+            throw CancellationException("")
+        }
+        val f2 = flow {
+            emit(1)
+            hang { expect(3) }
+        }
+
+        val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach { expect(2) }
+        assertFailsWith<CancellationException>(flow)
+        finish(4)
+    }
+
+    @Test
+    fun testCancellationExceptionDownstream() = runTest {
+        val f1 = flow {
+            emit(1)
+            expect(2)
+            hang { expect(5) }
+        }
+        val f2 = flow {
+            emit(1)
+            expect(3)
+            hang { expect(6) }
+        }
+
+        val flow = f1.combineLatest(f2, { _, _ -> 1 }).onEach {
+            expect(1)
+            yield()
+            expect(4)
+            throw CancellationException("")
+        }
+        assertFailsWith<CancellationException>(flow)
+        finish(7)
+    }
 }
 
 class CombineLatestTest : CombineLatestTestBase() {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
index 607d4cd..2a6e9c1 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
@@ -95,20 +95,25 @@
     }
 
     @Test
-    fun testUpstreamError() = runTest {
+    fun testUpstreamError()= testUpstreamError(TimeoutCancellationException(""))
+
+    @Test
+    fun testUpstreamErrorCancellation() = testUpstreamError(TimeoutCancellationException(""))
+
+    private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
         val latch = Channel<Unit>()
         val flow = flow {
             expect(1)
             emit(1)
             expect(2)
             latch.receive()
-            throw TestException()
+            throw cause
         }.debounce(1).map {
             latch.send(Unit)
             hang { expect(3) }
         }
 
-        assertFailsWith<TestException>(flow)
+        assertFailsWith<T>(flow)
         finish(4)
     }
 
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 5d007c3..6069ae6 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -36,4 +36,41 @@
         consumer.cancelAndJoin()
         finish(3)
     }
+
+    @Test
+    fun testCancellationExceptionDownstream() = runTest {
+        val flow = flow {
+            emit(1)
+            hang { expect(2) }
+        }.flatMapMerge {
+            flow {
+                emit(it)
+                expect(1)
+                throw CancellationException("")
+            }
+        }
+
+        assertFailsWith<CancellationException>(flow)
+        finish(3)
+    }
+
+    @Test
+    fun testCancellationExceptionUpstream() = runTest {
+        val flow = flow {
+            expect(1)
+            emit(1)
+            expect(2)
+            yield()
+            throw CancellationException("")
+        }.flatMapMerge {
+            flow {
+                expect(3)
+                emit(it)
+                hang { expect(4) }
+            }
+        }
+
+        assertFailsWith<CancellationException>(flow)
+        finish(5)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 49df21d..4adc354 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -234,6 +234,33 @@
         finish(6)
     }
 
+    @Test
+    fun testTimeoutExceptionUpstream() = runTest {
+        val flow = flow {
+            emit(1)
+            yield()
+            withTimeout(-1) {}
+            emit(42)
+        }.flowOn(NamedDispatchers("foo")).onEach {
+            expect(1)
+        }
+        assertFailsWith<TimeoutCancellationException>(flow)
+        finish(2)
+    }
+
+    @Test
+    fun testTimeoutExceptionDownstream() = runTest {
+        val flow = flow {
+            emit(1)
+            hang { expect(2) }
+        }.flowOn(NamedDispatchers("foo")).onEach {
+            expect(1)
+            withTimeout(-1) {}
+        }
+        assertFailsWith<TimeoutCancellationException>(flow)
+        finish(3)
+    }
+
     private inner class Source(private val value: Int) {
         public var contextName: String = "unknown"
 
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
index 055f847..a785814 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt
@@ -199,4 +199,33 @@
         ensureActive()
         finish(5)
     }
+
+    @Test
+    fun testTimeoutException() = runTest {
+        val flow = flow {
+            emit(1)
+            yield()
+            withTimeout(-1) {}
+            emit(42)
+        }.flowWith(NamedDispatchers("foo")) {
+            onEach { expect(1) }
+        }
+        assertFailsWith<TimeoutCancellationException>(flow)
+        finish(2)
+    }
+
+    @Test
+    fun testTimeoutExceptionDownstream() = runTest {
+        val flow = flow {
+            emit(1)
+            hang { expect(2) }
+        }.flowWith(NamedDispatchers("foo")) {
+            onEach {
+                expect(1)
+                withTimeout(-1) {}
+            }
+        }
+        assertFailsWith<TimeoutCancellationException>(flow)
+        finish(3)
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
index e77b128..9c96352 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
@@ -169,20 +169,25 @@
     }
 
     @Test
-    fun testUpstreamError() = runTest {
+    fun testUpstreamError() = testUpstreamError(TestException())
+
+    @Test
+    fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException(""))
+
+    private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
         val latch = Channel<Unit>()
         val flow = flow {
             expect(1)
             emit(1)
             expect(2)
             latch.receive()
-            throw TestException()
+            throw cause
         }.sample(1).map {
             latch.send(Unit)
             hang { expect(3) }
         }
 
-        assertFailsWith<TestException>(flow)
+        assertFailsWith<T>(flow)
         finish(4)
     }
 
@@ -219,7 +224,6 @@
         finish(3)
     }
 
-
     @Test
     fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest {
         val flow = flow {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
index 933bb16..fabca72 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt
@@ -113,4 +113,10 @@
         assertFailsWith<TestException>(flow)
         finish(5)
     }
+
+    @Test
+    fun testTake() = runTest {
+        val flow = flowOf(1, 2, 3, 4, 5).switchMap { flowOf(it) }
+        assertEquals(listOf(1), flow.take(1).toList())
+    }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
index decd230..b28320c 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
@@ -93,7 +93,7 @@
     }
 
     @Test
-    fun testCancesWhenFlowIsDone2() = runTest {
+    fun testCancelWhenFlowIsDone2() = runTest {
         val f1 = flow<String> {
             emit("1")
             emit("2")
@@ -189,4 +189,52 @@
         assertFailsWith<TestException>(flow)
         finish(2)
     }
+
+    @Test
+    fun testCancellationUpstream() = runTest {
+        val f1 = flow {
+            expect(1)
+            emit(1)
+            yield()
+            expect(4)
+            throw CancellationException("")
+        }
+
+        val f2 = flow {
+            expect(2)
+            emit(1)
+            expect(5)
+            hang { expect(6) }
+        }
+
+        val flow = f1.zip(f2, { _, _ -> 1 }).onEach { expect(3) }
+        assertFailsWith<CancellationException>(flow)
+        finish(7)
+    }
+
+    @Test
+    fun testCancellationDownstream() = runTest {
+        val f1 = flow {
+            expect(1)
+            emit(1)
+            yield()
+            expect(4)
+            hang { expect(6) }
+        }
+
+        val f2 = flow {
+            expect(2)
+            emit(1)
+            expect(5)
+            hang { expect(7) }
+        }
+
+        val flow = f1.zip(f2, { _, _ -> 1 }).onEach {
+            expect(3)
+            yield()
+            throw CancellationException("")
+        }
+        assertFailsWith<CancellationException>(flow)
+        finish(8)
+    }
 }
diff --git a/kotlinx-coroutines-core/js/src/Exceptions.kt b/kotlinx-coroutines-core/js/src/Exceptions.kt
index 83a0cda..f427041 100644
--- a/kotlinx-coroutines-core/js/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/js/src/Exceptions.kt
@@ -48,8 +48,6 @@
         (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
 }
 
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause))
-
 @Suppress("FunctionName")
 internal fun IllegalStateException(message: String, cause: Throwable?) =
     IllegalStateException(message.withCause(cause))
diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
similarity index 72%
rename from kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
rename to kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
index d6a9c31..8422f2b 100644
--- a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
+++ b/kotlinx-coroutines-core/js/src/flow/internal/FlowExceptions.kt
@@ -7,3 +7,4 @@
 import kotlinx.coroutines.*
 
 internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt
index d8f8ee3..52841cd 100644
--- a/kotlinx-coroutines-core/jvm/src/Builders.kt
+++ b/kotlinx-coroutines-core/jvm/src/Builders.kt
@@ -59,8 +59,7 @@
     private val blockedThread: Thread,
     private val eventLoop: EventLoop?
 ) : AbstractCoroutine<T>(parentContext, true) {
-    override val cancelsParent: Boolean
-        get() = false // it throws exception to parent instead of cancelling it
+    override val isScopedCoroutine: Boolean get() = true
 
     override fun afterCompletionInternal(state: Any?, mode: Int) {
         // wake up blocked thread
diff --git a/kotlinx-coroutines-core/jvm/src/Exceptions.kt b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
index bc7e92c..7a8f385 100644
--- a/kotlinx-coroutines-core/jvm/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
@@ -80,8 +80,6 @@
         (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
 }
 
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message, cause)
-
 @Suppress("NOTHING_TO_INLINE")
 internal actual inline fun Throwable.addSuppressedThrowable(other: Throwable) =
-    addSuppressed(other)
+    addSuppressed(other)
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index ee41a0a..ffabb99 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -127,7 +127,6 @@
     channel: Channel<E>,
     active: Boolean
 ) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
-    override val cancelsParent: Boolean get() = true
 
     override fun onCancelling(cause: Throwable?) {
         _channel.cancel(cause?.let {
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
deleted file mode 100644
index 7ff34e7..0000000
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/AbortFlowException.kt
+++ /dev/null
@@ -1,11 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.internal
-
-import kotlinx.coroutines.*
-
-internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
-    override fun fillInStackTrace(): Throwable = this
-}
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
new file mode 100644
index 0000000..d8d4d21
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+
+internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
+    override fun fillInStackTrace(): Throwable {
+        if (DEBUG) super.fillInStackTrace()
+        return this
+    }
+}
+
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled") {
+    override fun fillInStackTrace(): Throwable {
+        if (DEBUG) super.fillInStackTrace()
+        return this
+    }
+}
diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt
index 82fd81a..0dc90d5 100644
--- a/kotlinx-coroutines-core/native/src/Builders.kt
+++ b/kotlinx-coroutines-core/native/src/Builders.kt
@@ -54,8 +54,7 @@
     parentContext: CoroutineContext,
     private val eventLoop: EventLoop?
 ) : AbstractCoroutine<T>(parentContext, true) {
-    override val cancelsParent: Boolean
-        get() = false // it throws exception to parent instead of cancelling it
+    override val isScopedCoroutine: Boolean get() = true
 
     @Suppress("UNCHECKED_CAST")
     fun joinBlocking(): T = memScoped {
diff --git a/kotlinx-coroutines-core/native/src/Exceptions.kt b/kotlinx-coroutines-core/native/src/Exceptions.kt
index 29c3ce5..109b910 100644
--- a/kotlinx-coroutines-core/native/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/native/src/Exceptions.kt
@@ -48,8 +48,6 @@
         (message!!.hashCode() * 31 + job.hashCode()) * 31 + (cause?.hashCode() ?: 0)
 }
 
-internal actual class CoroutinesInternalError actual constructor(message: String, cause: Throwable) : Error(message.withCause(cause))
-
 @Suppress("FunctionName")
 internal fun IllegalStateException(message: String, cause: Throwable?) =
     IllegalStateException(message.withCause(cause))
diff --git a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
deleted file mode 100644
index d6a9c31..0000000
--- a/kotlinx-coroutines-core/native/src/flow/internal/AbortFlowException.kt
+++ /dev/null
@@ -1,9 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.internal
-
-import kotlinx.coroutines.*
-
-internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
diff --git a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
similarity index 71%
copy from kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
copy to kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
index d6a9c31..4a291ea 100644
--- a/kotlinx-coroutines-core/js/src/flow/internal/AbortFlowException.kt
+++ b/kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt
@@ -7,3 +7,5 @@
 import kotlinx.coroutines.*
 
 internal actual class AbortFlowException : CancellationException("Flow was aborted, no more elements needed")
+internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
+