Merge pull request #1298 from Kotlin/version-1.3.0-M2

Version 1.3.0-M2
diff --git a/CHANGES.md b/CHANGES.md
index c894b8b..608073a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,12 @@
 # Change log for kotlinx.coroutines
 
+## Version 1.3.0-M2
+ * Kotlin updated to 1.3.40.
+ * `Flow` exception transparency concept.
+ * New Flow declarative `Flow` operators: `onCompletion`, `catch`, `retryWhen`, `launchIn` (#1263).
+ * `Publisher.asFlow` is integrated with `buffer` operator.
+ * `Publisher.openSubscription` default request size is `1` instead of `0` (#1267).
+
 ## Version 1.3.0-M1
 
 Flow:
@@ -38,6 +45,10 @@
  * Prevent internal names clash that caused errors for ProGuard (#1159).
  * POSIX's `nanosleep` as `delay` in `runBlocking ` in K/N (#1225).
 
+## Version 1.2.2
+
+* Kotlin updated to 1.3.40.
+
 ## Version 1.2.1
 
 Major:
diff --git a/README.md b/README.md
index a31c31c..f3c5558 100644
--- a/README.md
+++ b/README.md
@@ -2,10 +2,10 @@
 
 [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
 [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0)
-[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0-M1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-M1)
+[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.0-M2) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.0-M2)
 
 Library support for Kotlin coroutines with [multiplatform](#multiplatform) support.
-This is a companion version for Kotlin `1.3.31` release.
+This is a companion version for Kotlin `1.3.40` release.
 
 ```kotlin
 suspend fun main() = coroutineScope {
@@ -81,7 +81,7 @@
 <dependency>
     <groupId>org.jetbrains.kotlinx</groupId>
     <artifactId>kotlinx-coroutines-core</artifactId>
-    <version>1.3.0-M1</version>
+    <version>1.3.0-M2</version>
 </dependency>
 ```
 
@@ -89,7 +89,7 @@
 
 ```xml
 <properties>
-    <kotlin.version>1.3.31</kotlin.version>
+    <kotlin.version>1.3.40</kotlin.version>
 </properties>
 ```
 
@@ -99,7 +99,7 @@
 
 ```groovy
 dependencies {
-    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1'
+    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M2'
 }
 ```
 
@@ -107,7 +107,7 @@
 
 ```groovy
 buildscript {
-    ext.kotlin_version = '1.3.31'
+    ext.kotlin_version = '1.3.40'
 }
 ```
 
@@ -125,7 +125,7 @@
 
 ```groovy
 dependencies {
-    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1")
+    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M2")
 }
 ```
 
@@ -133,7 +133,7 @@
 
 ```groovy
 plugins {
-    kotlin("jvm") version "1.3.31"
+    kotlin("jvm") version "1.3.40"
 }
 ```
 
@@ -144,7 +144,7 @@
 Core modules of `kotlinx.coroutines` are also available for 
 [Kotlin/JS](#js) and [Kotlin/Native](#native).
 In common code that should get compiled for different platforms, add dependency to  
-[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-M1/jar)
+[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.0-M2/jar)
 (follow the link to get the dependency declaration snippet).
 
 ### Android
@@ -153,7 +153,7 @@
 module as dependency when using `kotlinx.coroutines` on Android:
 
 ```groovy
-implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-M1'
+implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-M2'
 ```
 
 This gives you access to Android [Dispatchers.Main](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-android/kotlinx.coroutines.android/kotlinx.coroutines.-dispatchers/index.html)
@@ -172,7 +172,7 @@
 ### JS
 
 [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as 
-[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-M1/jar)
+[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.0-M2/jar)
 (follow the link to get the dependency declaration snippet).
  
 You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. 
@@ -180,7 +180,7 @@
 ### Native
 
 [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as 
-[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-M1/jar)
+[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.0-M2/jar)
 (follow the link to get the dependency declaration snippet).
 
 Only single-threaded code (JS-style) on Kotlin/Native is currently supported. 
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index d48cb51..94df09a 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -78,6 +78,10 @@
 
 public final class kotlinx/coroutines/CancellableContinuationKt {
 	public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
+	public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
+	public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
 public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle {
@@ -812,12 +816,16 @@
 	public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun conflate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -831,6 +839,7 @@
 	public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
 	public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun filterIsInstance (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -849,18 +858,22 @@
 	public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun fold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 	public static final fun getDEFAULT_CONCURRENCY ()I
+	public static final fun launchIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
 	public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
-	public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
-	public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
 	public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
-	public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+	public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
 	public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun retryWhen (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
 	public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
@@ -891,21 +904,45 @@
 	public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun onErrorResumeNext (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+	public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
+	public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
 	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
-	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)V
-	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
+	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
+	public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
 	public static final fun subscribeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
 }
 
+public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
+	public final field capacity I
+	public final field context Lkotlin/coroutines/CoroutineContext;
+	public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
+	public fun additionalToStringProps ()Ljava/lang/String;
+	public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
+	public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+	protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
+	public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
+	public fun toString ()Ljava/lang/String;
+	public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
+	public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
+}
+
 public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
 	public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
 	public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
 }
 
+public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {
+	public fun <init> (Lkotlinx/coroutines/channels/SendChannel;)V
+	public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
 public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
 	public synthetic fun <init> (II)V
 	public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
@@ -979,6 +1016,14 @@
 	public abstract fun trySelect (Ljava/lang/Object;)Z
 }
 
+public final class kotlinx/coroutines/selects/SelectKt {
+	public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
+public final class kotlinx/coroutines/selects/SelectUnbiasedKt {
+	public static final fun selectUnbiased (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
 public final class kotlinx/coroutines/selects/UnbiasedSelectBuilderImpl : kotlinx/coroutines/selects/SelectBuilder {
 	public fun <init> (Lkotlin/coroutines/Continuation;)V
 	public final fun getClauses ()Ljava/util/ArrayList;
@@ -992,6 +1037,10 @@
 	public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
 }
 
+public final class kotlinx/coroutines/selects/WhileSelectKt {
+	public static final fun whileSelect (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
 public abstract interface class kotlinx/coroutines/sync/Mutex {
 	public abstract fun getOnLock ()Lkotlinx/coroutines/selects/SelectClause2;
 	public abstract fun holdsLock (Ljava/lang/Object;)Z
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
index 7191ca1..2afa313 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt
@@ -31,6 +31,5 @@
 public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
 	public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
 	public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
-	public static synthetic fun from$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
 }
 
diff --git a/gradle.properties b/gradle.properties
index 643d36a..bf4a937 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,11 +1,11 @@
 # Kotlin
-version=1.3.0-M1-SNAPSHOT
+version=1.3.0-M2-SNAPSHOT
 group=org.jetbrains.kotlinx
-kotlin_version=1.3.31
+kotlin_version=1.3.40
 
 # Dependencies
 junit_version=4.12
-atomicfu_version=0.12.8
+atomicfu_version=0.12.9
 html_version=0.6.8
 lincheck_version=2.0
 dokka_version=0.9.16-rdev-2-mpp-hacks
diff --git a/kotlinx-coroutines-core/common/src/Annotations.kt b/kotlinx-coroutines-core/common/src/Annotations.kt
index 321db60..5ee89b8 100644
--- a/kotlinx-coroutines-core/common/src/Annotations.kt
+++ b/kotlinx-coroutines-core/common/src/Annotations.kt
@@ -31,6 +31,7 @@
 @MustBeDocumented
 @Retention(value = AnnotationRetention.BINARY)
 @Experimental(level = Experimental.Level.WARNING)
+@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.TYPEALIAS, AnnotationTarget.PROPERTY)
 public annotation class FlowPreview
 
 /**
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 294044d..5c01d00 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -204,16 +204,17 @@
  */
 @FlowPreview
 @Deprecated(
-    message = "Use channelFlow instead",
-    level = DeprecationLevel.WARNING,
-    replaceWith = ReplaceWith("channelFlow(block)")
+    message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.",
+    level = DeprecationLevel.WARNING
 )
+@Suppress("DeprecatedCallableAddReplaceWith")
 public fun <T> flowViaChannel(
     bufferSize: Int = BUFFERED,
     @BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
 ): Flow<T> {
     return channelFlow<T> {
         block(channel)
+        awaitClose()
     }.buffer(bufferSize)
 }
 
@@ -327,4 +328,4 @@
 
     override fun toString(): String =
         "block[$block] -> ${super.toString()}"
-}
\ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt
index a60598c..b810305 100644
--- a/kotlinx-coroutines-core/common/src/flow/Flow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt
@@ -12,13 +12,18 @@
  * A cold asynchronous data stream that sequentially emits values
  * and completes normally or with an exception.
  *
- * _Cold flow_ means that intermediate operators on a flow such as [map] and [filter] do not trigger its execution,
- * which is only done by terminal operators like [single]. By default, flows are _sequential_ and all flow
- * operations are executed sequentially in the same coroutine, see [buffer] for details.
+ * _Intermediate operators_ on the flow such as [map], [filter], [take], [zip], etc are functions that are
+ * applied to the _upstream_ flow or flows and return a _downstream_ flow where further operators can be applied to.
+ * Intermediate operations do not execute any code in the flow and are not suspending functions themselves.
+ * They only set up a chain of operations for future execution and quickly return.
+ * This is known as a _cold flow_ property.
  *
- * _Collecting the flow_ means executing all its operations.
- * Flow values can be collected in a suspending manner without actual blocking using the [collect] extension that
- * completes normally or exceptionally:
+ * _Terminal operators_ on the flow are either suspending functions such as [collect], [single], [reduce], [toList], etc.
+ * or [launchIn] operator that starts collection of the flow in the given scope.
+ * They are applied to the upstream flow and trigger execution of all operations.
+ * Execution of the flow is also called _collecting the flow_  and is always performed in a suspending manner
+ * without actual blocking. Terminal operator complete normally or exceptionally depending on successful or failed
+ * execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
  *
  * ```
  * try {
@@ -30,10 +35,13 @@
  * }
  * ```
  *
- * Additionally, the library provides a rich set of terminal operators such as [single], [reduce] and others.
+ * By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
+ * with an exception for a few operations specifically designed to introduce concurrency into flow
+ * the execution such a [buffer] and [flatMapMerge]. See their documentation for details.
  *
- * Flows don't carry information whether they are cold streams (which can be collected repeatedly and
- * trigger their evaluation every time [collect] is executed) or hot ones, but, conventionally, they represent cold streams.
+ * Flow interface does not carry information whether a flow is a truly a cold stream that can be collected repeatedly and
+ * triggers execution of the same code every time it is collected or if it is a hot stream that emits different
+ * values from the same running source on each collection. However, conventionally flows represent cold streams.
  * Transitions between hot and cold streams are supported via channels and the corresponding API:
  * [channelFlow], [produceIn], [broadcastIn].
  *
@@ -48,7 +56,18 @@
  * * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
  *   potentially concurrent calls to [send][kotlinx.coroutines.channels.SendChannel.send] function.
  *
- * ### Flow context
+ * ### Flow constraints
+ *
+ * All implementations of `Flow` interface must adhere to two key properties that are described in detail below:
+ *
+ * * Context preservation.
+ * * Exception transparency.
+ *
+ * These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
+ * in such a way so that upstream flow emitters can be developed separately from downstream flow collectors.
+ * A user of the flow does not needs to know implementation details of the upstream flows it uses.
+ *
+ * ### Context preservation
  *
  * The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks
  * it downstream, thus making reasoning about the execution context of particular transformations or terminal
@@ -77,7 +96,7 @@
  * }
  * ```
  *
- * From the implementation point of view it means that all flow implementations should
+ * From the implementation point of view, it means that all flow implementations should
  * emit only from the same coroutine.
  * This constraint is efficiently enforced by the default [flow] builder.
  * The [flow] builder should be used if flow implementation does not start any coroutines.
@@ -108,20 +127,46 @@
  *  - Collecting another flow from a separate context is allowed, but it has the same effect as
  *    [flowOn] operator on that flow, which is more efficient.
  *
+ * ### Exception transparency
+ *
+ * Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
+ * it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
+ * `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
+ * [catch][Flow.catch] operator and it is designed to catch only exception coming from upstream flow while passing
+ * all the downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
+ * throw any unhandled exception that occurs in its code or in upstream flows, for example:
+ *
+ * ```
+ * flow { emitData() }
+ *     .map { computeOne(it) }
+ *     .catch { ... } // catches exceptions in emitData and computeOne
+ *     .map { computeTwo(it) }
+ *     .collect { process(it) } // throws exceptions from process and computeTwo
+ * ```
+ * The same reasoning can be applied to [onCompletion] operator that is a declarative replacement for `finally` block.
+ *
+ * Failure to adhere to the exception transparency requirement would result in strange behaviours that would make
+ * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
+ * by the upstream flow, limiting the ability of local reasoning about the code.
+ *
+ * Currently, flow infrastructure does not enforce exception transparency contracts, however, it might be enforced
+ * in the future either at run time or at compile time.
+ *
+ * ### Reactive streams
+ *
  * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
- * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from kotlinx-coroutines-reactive module.
+ * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
  */
 @ExperimentalCoroutinesApi
 public interface Flow<out T> {
-
     /**
      * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
      * This method should never be implemented or used directly.
      *
      * The only way to implement flow interface directly is to extend [AbstractFlow].
-     * To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { }` extension should be used.
-     * Such limitation ensures that context preservation property is not violated and prevents most of the developer mistakes
-     * related to concurrency, inconsistent flow dispatchers and cancellation.
+     * To collect it into the specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
+     * should be used. Such limitation ensures that context preservation property is not violated and prevents most
+     * of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.
      */
     @InternalCoroutinesApi
     public suspend fun collect(collector: FlowCollector<T>)
@@ -129,8 +174,11 @@
 
 /**
  * Base class to extend to have a stateful implementation of the flow.
- * It tracks all the properties required for context preservation and throws [IllegalStateException] if any of the properties are violated.
+ * It tracks all the properties required for context preservation and throws [IllegalStateException]
+ * if any of the properties are violated.
+ * 
  * Example of the implementation:
+ *
  * ```
  * // list.asFlow() + collect counter
  * class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {
diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt
index 114a32e..bc45b01 100644
--- a/kotlinx-coroutines-core/common/src/flow/Migration.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt
@@ -105,11 +105,18 @@
 /** @suppress **/
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Flow analogue is named onErrorCollect",
-    replaceWith = ReplaceWith("onErrorCollect(fallback)")
+    message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
+    replaceWith = ReplaceWith("catch { emitAll(fallback) }")
 )
 public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = error("Should not be called")
 
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
+    replaceWith = ReplaceWith("catch { emitAll(fallback) }")
+)
+public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = error("Should not be called")
+
 /**
  * Self-explanatory, the reason of deprecation is "context preservation" property (you can read more in [Flow] documentation)
  * @suppress
@@ -120,7 +127,7 @@
 
 /**
  * `subscribe` is Rx-specific API that has no direct match in flows.
- * One can use `launch` instead, for example the following:
+ * One can use [launchIn] instead, for example the following:
  * ```
  * flowable
  *     .observeOn(Schedulers.io())
@@ -129,30 +136,36 @@
  *
  * has the following Flow equivalent:
  * ```
- * launch(Dispatchers.IO) {
- *     try {
- *         flow.collect { value ->
- *             println("Received $value")
- *         }
- *         println("Flow is completed successfully")
- *     } catch (e: Throwable) {
- *         println("Exception $e happened")
- *     }
- * }
+ * flow
+ *     .onEach { value -> println("Received $value") }
+ *     .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") }
+ *     .catch { cause -> println("Exception $cause happened") }
+ *     .flowOn(Dispatchers.IO)
+ *     .launchIn(myScope)
  * ```
- * But most of the time it is better to use terminal operators like [single] instead of [collect].
+ *
+ * Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation.
+ *
+ * Or terminal operators like [single] can be used from suspend functions.
  * @suppress
  */
-@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
+@Deprecated(
+    message = "Use launchIn with onEach, onCompletion and catch operators instead",
+    level = DeprecationLevel.ERROR
+)
 public fun <T> Flow<T>.subscribe(): Unit = error("Should not be called")
 
 /** @suppress **/
-@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
-public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit): Unit = error("Should not be called")
+@Deprecated(
+    message = "Use launchIn with onEach, onCompletion and catch operators instead",
+    level = DeprecationLevel.ERROR
+)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = error("Should not be called")
 
 /** @suppress **/
-@Deprecated(message = "Use launch + collect instead", level = DeprecationLevel.ERROR)
-public fun <T> Flow<T>.subscribe(onEach: (T) -> Unit, onError: (Throwable) -> Unit): Unit = error("Should not be called")
+@Deprecated(
+    message = "Use launchIn with onEach, onCompletion and catch operators instead",
+    level = DeprecationLevel.ERROR
+)public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = error("Should not be called")
 
 /**
  * Note that this replacement is sequential (`concat`) by default.
@@ -181,7 +194,7 @@
  */
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Flow analogue is named flattenConcat",
+    message = "Flow analogue of 'merge' is 'flattenConcat'",
     replaceWith = ReplaceWith("flattenConcat()")
 )
 public fun <T> Flow<Flow<T>>.merge(): Flow<T> = error("Should not be called")
@@ -189,7 +202,7 @@
 /** @suppress **/
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Flow analogue is named flattenConcat",
+    message = "Flow analogue of 'flatten' is 'flattenConcat'",
     replaceWith = ReplaceWith("flattenConcat()")
 )
 public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = error("Should not be called")
@@ -210,7 +223,7 @@
  */
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Kotlin analogue of compose is 'let'",
+    message = "Flow analogue of 'compose' is 'let'",
     replaceWith = ReplaceWith("let(transformer)")
 )
 public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = error("Should not be called")
@@ -220,7 +233,7 @@
  */
 @Deprecated(
     level = DeprecationLevel.ERROR,
-    message = "Kotlin analogue of 'skip' is 'drop'",
+    message = "Flow analogue of 'skip' is 'drop'",
     replaceWith = ReplaceWith("drop(count)")
 )
 public fun <T> Flow<T>.skip(count: Int): Flow<T> = error("Should not be called")
@@ -246,3 +259,23 @@
     replaceWith = ReplaceWith("scan(initial, operation)")
 )
 public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = error("Should not be called")
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
+    replaceWith = ReplaceWith("catch { emit(fallback) }")
+)
+// Note: this version without predicate gives better "replaceWith" action
+public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = error("Should not be called")
+
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
+    replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }")
+)
+public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow<T> =
+    catch { e ->
+        // Note: default value is for binary compatibility with preview version, that is why it has body
+        if (!predicate(e)) throw e
+        emit(fallback)
+    }
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 57a0132..e676869 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -15,8 +15,13 @@
 internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
     this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
 
-// Operators that use channels extend this ChannelFlow and are always fused with each other
-internal abstract class ChannelFlow<T>(
+/**
+ * Operators that use channels extend this ChannelFlow and are always fused with each other.
+ *
+ * @suppress **This an internal API and should not be used from general code.**
+ */
+@InternalCoroutinesApi
+public abstract class ChannelFlow<T>(
     // upstream context
     @JvmField val context: CoroutineContext,
     // buffer capacity between upstream and downstream context
@@ -62,7 +67,7 @@
     fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
         scope.broadcast(context, produceCapacity, start, block = collectToFun)
 
-    fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
+    open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
         scope.flowProduce(context, produceCapacity, block = collectToFun)
 
     override suspend fun collect(collector: FlowCollector<T>) =
@@ -134,8 +139,8 @@
 // Now if the underlying collector was accepting concurrent emits, then this one is too
 // todo: we might need to generalize this pattern for "thread-safe" operators that can fuse with channels
 private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {
-    // SendingCollector does not care about the context at all so can be used as it
-    is SendingCollector -> this
+    // SendingCollector & NopCollector do not care about the context at all and can be used as is
+    is SendingCollector, is NopCollector -> this
     // Original collector is concurrent, so wrap into ConcurrentUndispatchedContextCollector (also concurrent)
     is ConcurrentFlowCollector -> ConcurrentUndispatchedContextCollector(this, emitContext)
     // Otherwise just wrap into UndispatchedContextCollector interface implementation
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
index 6119d3d..f37cc1c 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
@@ -5,6 +5,7 @@
 package kotlinx.coroutines.flow.internal
 
 import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.channels.ArrayChannel
 import kotlinx.coroutines.flow.*
@@ -17,8 +18,13 @@
 // Two basic implementations are here: SendingCollector and ConcurrentFlowCollector
 internal interface ConcurrentFlowCollector<T> : FlowCollector<T>
 
-// Concurrent collector because it sends to a channel
-internal class SendingCollector<T>(
+/**
+ * Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently.
+ *
+ * @suppress **This an internal API and should not be used from general code.**
+ */
+@InternalCoroutinesApi
+public class SendingCollector<T>(
     private val channel: SendChannel<T>
 ) : ConcurrentFlowCollector<T> {
     override suspend fun emit(value: T) = channel.send(value)
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index 98f5cec..258869f 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -70,7 +70,6 @@
     context: CoroutineContext,
     uCont: Continuation<T>
 ) : ScopeCoroutine<T>(context, uCont) {
-
     public override fun childCancelled(cause: Throwable): Boolean {
         if (cause is ChildCancelledException) return true
         return cancelImpl(cause)
@@ -81,7 +80,6 @@
     parentContext: CoroutineContext,
     channel: Channel<T>
 ) : ProducerCoroutine<T>(parentContext, channel) {
-
     public override fun childCancelled(cause: Throwable): Boolean {
         if (cause is ChildCancelledException) return true
         return cancelImpl(cause)
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt
new file mode 100644
index 0000000..297d4d1
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt
@@ -0,0 +1,13 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.flow.*
+
+internal object NopCollector : ConcurrentFlowCollector<Any?> {
+    override suspend fun emit(value: Any?) {
+        // does nothing
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index 8ccf7cf..c9aa555 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -151,7 +151,7 @@
 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
 
 /**
- * The operator that changes the context where this flow is executed to the given [context].
+ * Changes the context where this flow is executed to the given [context].
  * This operator is composable and affects only preceding operators that do not have its own context.
  * This operator is context preserving: [context] **does not** leak into the downstream flow.
  *
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
index 29777b7..c744842 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
@@ -13,90 +13,216 @@
 import kotlin.jvm.*
 import kotlinx.coroutines.flow.unsafeFlow as flow
 
-public typealias ExceptionPredicate = (Throwable) -> Boolean
+/**
+ * Catches exceptions in the flow completion and calls a specified [action] with
+ * the caught exception. This operator is *transparent* to exceptions that occur
+ * in downstream flow and does not catch exceptions that are thrown to cancel the flow.
+ *
+ * For example:
+ *
+ * ```
+ * flow { emitData() }
+ *     .map { computeOne(it) }
+ *     .catch { ... } // catches exceptions in emitData and computeOne
+ *     .map { computeTwo(it) }
+ *     .collect { process(it) } // throws exceptions from process and computeTwo
+ * ```
+ *
+ * Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows with
+ * `try { ... } catch (e: Throwable) { action(e) }`.
+ *
+ * Any exception in the [action] code itself proceeds downstream where it can be
+ * caught by further `catch` operators if needed. If a particular exception does not need to be
+ * caught it can be rethrown from the action of `catch` operator. For example:
+ *
+ * ```
+ * flow.catch { e ->
+ *     if (e !is IOException) throw e // rethrow all but IOException
+ *     // e is IOException here
+ *     ...
+ * }
+ * ```
+ *
+ * The [action] code has [FlowCollector] as a receiver and can [emit][FlowCollector.emit] values downstream.
+ * For example, caught exception can be replaced with some wrapper value for errors:
+ *
+ * ```
+ * flow.catch { e -> emit(ErrorWrapperValue(e)) }
+ * ```
+ *
+ * The [action] can also use [emitAll] to fallback on some other flow in case of an error. However, to
+ * retry an original flow use [retryWhen] operator that can retry the flow multiple times without
+ * introducing ever-growing stack of suspending calls.
+ */
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
+public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
+    flow {
+        val exception = catchImpl(this)
+        if (exception != null) action(exception)
+    }
 
-private val ALWAYS_TRUE: ExceptionPredicate = { true }
+/**
+ * @suppress **Deprecated**: Use `(Throwable) -> Boolean` functional type
+ */
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Use (Throwable) -> Boolean functional type",
+    replaceWith = ReplaceWith("(Throwable) -> Boolean")
+)
+public typealias ExceptionPredicate = (Throwable) -> Boolean
 
 /**
  * Switches to the [fallback] flow if the original flow throws an exception that matches the [predicate].
  * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
+ *
+ * @suppress **Deprecated**: Use `catch { e -> if (predicate(e)) emitAll(fallback) else throw e }`
  */
-@FlowPreview
+@Deprecated(
+    level = DeprecationLevel.ERROR,
+    message = "Use catch { e -> if (predicate(e)) emitAll(fallback) else throw e }",
+    replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emitAll(fallback) else throw e }")
+)
 public fun <T> Flow<T>.onErrorCollect(
     fallback: Flow<T>,
-    predicate: ExceptionPredicate = ALWAYS_TRUE
-): Flow<T> = collectSafely { e ->
+    predicate: (Throwable) -> Boolean = { true }
+): Flow<T> = catch { e ->
     if (!predicate(e)) throw e
     emitAll(fallback)
 }
 
 /**
- * Emits the [fallback] value and finishes successfully if the original flow throws exception that matches the given [predicate].
- * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
+ * Retries collection of the given flow up to [retries] times when an exception that matches the
+ * given [predicate] occurs in the upstream flow. This operator is *transparent* to exceptions that occur
+ * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
+ *
+ * See [catch] for details on how exceptions are caught in flows.
+ *
+ * The default value of [retries] parameter is [Long.MAX_VALUE]. This value effectively means to retry forever.
+ * This operator is a shorthand for the following code (see [retryWhen]). Note that `attempt` is checked first
+ * and [predicate] is not called when it reaches the given number of [retries]:
+ *
+ * ```
+ * retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
+ * ```
+ *
+ * The [predicate] parameter is always true by default. The [predicate] is a suspending function,
+ * so it can be also used to introduce delay before retry, for example:
+ *
+ * ```
+ * flow.retry(3) { e ->
+ *     // retry on any IOException but also introduce delay if retrying
+ *     (e is IOException).also { if (it) delay(1000) }
+ * }
+ * ```
+ *
+ * @throws IllegalArgumentException when [retries] is not positive.
  */
-@FlowPreview
-public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: ExceptionPredicate = ALWAYS_TRUE): Flow<T> =
-    collectSafely { e ->
-        if (!predicate(e)) throw e
-        emit(fallback)
-    }
-
-/**
- * Operator that retries [n][retries] times to collect the given flow in an exception that matches the given [predicate] occurs
- * in the given flow. Exceptions from collectors of this flow are not retried.
- * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
- */
-@FlowPreview
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
 public fun <T> Flow<T>.retry(
-    retries: Int = Int.MAX_VALUE,
-    predicate: ExceptionPredicate = ALWAYS_TRUE
+    retries: Long = Long.MAX_VALUE,
+    predicate: suspend (cause: Throwable) -> Boolean = { true }
 ): Flow<T> {
     require(retries > 0) { "Expected positive amount of retries, but had $retries" }
-    return flow {
-        @Suppress("NAME_SHADOWING")
-        var retries = retries
-        // Note that exception may come from the downstream operators, we should not switch on that
-        while (true) {
-            var fromDownstream = false
-            try {
-                collect { value ->
-                    try {
-                        emit(value)
-                    } catch (e: Throwable) {
-                        fromDownstream = true
-                        throw e
-                    }
+    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
+}
+
+@FlowPreview
+@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with retries: Int preview version")
+public fun <T> Flow<T>.retry(
+    retries: Int = Int.MAX_VALUE,
+    predicate: (Throwable) -> Boolean = { true }
+): Flow<T> {
+    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
+    return retryWhen { cause, attempt -> predicate(cause) && attempt < retries }
+}
+
+/**
+ * Retries collection of the given flow when an exception occurs in the upstream flow and the
+ * [predicate] returns true. The predicate also receives an `attempt` number as parameter,
+ * starting from zero on the initial call. This operator is *transparent* to exceptions that occur
+ * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
+ *
+ * For example, the following call retries the flow forever if the error is caused by `IOException`, but
+ * stops after 3 retries on any other exception:
+ *
+ * ```
+ * flow.retryWhen { cause, attempt -> cause is IOException || attempt < 3 }
+ * ```
+ *
+ * To implement a simple retry logic with a limit on the number of retries use [retry] operator.
+ *
+ * Similarly to [catch] operator, the [predicate] code has [FlowCollector] as a receiver and can
+ * [emit][FlowCollector.emit] values downstream.
+ * The [predicate] is a suspending function, so it can be used to introduce delay before retry, for example:
+ *
+ * ```
+ * flow.retryWhen { cause, attempt ->
+ *     if (cause is IOException) {    // retry on IOException
+ *         emit(RetryWrapperValue(e))
+ *         delay(1000)                // delay for one second before retry
+ *         true
+ *     } else {                       // do not retry otherwise
+ *         false
+ *     }
+ * }
+ * ```
+ *
+ * See [catch] for more details.
+ */
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
+public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
+    flow {
+        var attempt = 0L
+        var shallRetry: Boolean
+        do {
+            shallRetry = false
+            val cause = catchImpl(this)
+            if (cause != null) {
+                if (predicate(cause, attempt)) {
+                    shallRetry = true
+                    attempt++
+                } else {
+                    throw cause
                 }
-                break
+            }
+        } while (shallRetry)
+    }
+
+// Return exception from upstream or null
+internal suspend fun <T> Flow<T>.catchImpl(
+    collector: FlowCollector<T>
+): Throwable? {
+    var fromDownstream: Throwable? = null
+    try {
+        collect {
+            try {
+                collector.emit(it)
             } catch (e: Throwable) {
-                if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e
-                if (!predicate(e) || retries-- == 0) throw e
+                fromDownstream = e
+                throw e
             }
         }
+    } catch (e: Throwable) {
+        /*
+         * First check ensures that we catch an original exception, not one rethrown by an operator.
+         * Seconds check ignores cancellation causes, they cannot be caught.
+         */
+        if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
+            throw e // Rethrow exceptions from downstream and cancellation causes
+        } else {
+            return e // not from downstream
+        }
     }
+    return null
 }
 
 private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
     val job = coroutineContext[Job]
     if (job == null || !job.isCancelled) return false
-    return unwrap(job.getCancellationException()) == unwrap(this)
+    return isSameExceptionAs(job.getCancellationException())
 }
 
-private fun <T> Flow<T>.collectSafely(onException: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T> =
-    flow {
-        // Note that exception may come from the downstream operators, we should not switch on that
-        var fromDownstream = false
-        try {
-            collect {
-                try {
-                    emit(it)
-                } catch (e: Throwable) {
-                    fromDownstream = true
-                    throw e
-                }
-            }
-        } catch (e: Throwable) {
-            if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e
-            onException(e)
-        }
-    }
+private fun Throwable.isSameExceptionAs(other: Throwable?): Boolean =
+    other != null && unwrap(other) == unwrap(this)
+
+
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
index b10349e..f6b32b3 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
@@ -9,7 +9,8 @@
 package kotlinx.coroutines.flow
 
 import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.internal.NULL
+import kotlinx.coroutines.flow.internal.*
+import kotlin.coroutines.*
 import kotlin.jvm.*
 import kotlinx.coroutines.flow.unsafeFlow as flow
 
@@ -101,6 +102,64 @@
 }
 
 /**
+ * Invokes the given [action] when the given flow is completed or cancelled, using
+ * the exception from the upstream (if any) as cause parameter of [action].
+ *
+ * Conceptually, [onCompletion] is similar to wrapping the flow collection into a `finally` block,
+ * for example the following imperative snippet:
+ * ```
+ * try {
+ *     myFlow.collect { value ->
+ *         println(value)
+ *     }
+ * } finally {
+ *     println("Done")
+ * }
+ * ```
+ *
+ * can be replaced with a declarative one using [onCompletion]:
+ * ```
+ * myFlow
+ *     .onEach { println(it) }
+ *     .onCompletion { println("Done") }
+ *     .collect()
+ * ```
+ *
+ * This operator is *transparent* to exceptions that occur in downstream flow
+ * and does not observe exceptions that are thrown to cancel the flow,
+ * while any exception from the [action] will be thrown downstream.
+ * This behaviour can be demonstrated by the following example:
+ * ```
+ * flow { emitData() }
+ *     .map { computeOne(it) }
+ *     .onCompletion { println(it) } // Can print exceptions from emitData and computeOne
+ *     .map { computeTwo(it) }
+ *     .onCompletion { println(it) } // Can print exceptions from emitData, computeOne, onCompletion and computeTwo
+ *     .collect()
+ * ```
+ */
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
+public fun <T> Flow<T>.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow<T> = flow {
+    var exception: Throwable? = null
+    try {
+        exception = catchImpl(this)
+    } finally {
+        // Separate method because of KT-32220
+        invokeSafely(action, exception)
+        exception?.let { throw it }
+    }
+}
+
+private suspend fun invokeSafely(action: suspend (cause: Throwable?) -> Unit, cause: Throwable?) {
+    try {
+        action(cause)
+    } catch (e: Throwable) {
+        if (cause !== null) e.addSuppressedThrowable(cause)
+        throw e
+    }
+}
+
+/**
  * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
  * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
  * For example:
diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
index c7f8f2e..a98b7f3 100644
--- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
+++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
@@ -8,13 +8,55 @@
 package kotlinx.coroutines.flow
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.internal.*
 import kotlin.jvm.*
 
 /**
+ * Terminal flow operator that collects the given flow but ignores all emitted values.
+ * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
+ *
+ * It is a shorthand for `collect {}`.
+ *
+ * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
+ * handle an exception that might occur in the upstream flow or during processing, for example:
+ *
+ * ```
+ * flow
+ *     .onEach { value -> process(value) }
+ *     .catch { e -> handleException(e) }
+ *     .collect() // trigger collection of the flow
+ * ```
+ */
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
+public suspend fun Flow<*>.collect() = collect(NopCollector)
+
+/**
+ * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
+ * It is a shorthand for `scope.launch { flow.collect() }`.
+ *
+ * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
+ * handle an exception that might occur in the upstream flow or during processing, for example:
+ * ```
+ * flow
+ *     .onEach { value -> updateUi(value) }
+ *     .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
+ *     .catch { cause -> LOG.error("Exception: $cause") }
+ *     .launchIn(uiScope)
+ * ```
+ *
+ * Note that resulting value of [launchIn] is not used the provided scope takes care of cancellation.
+ */
+@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
+public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
+    collect() // tail-call
+}
+
+/**
  * Terminal flow operator that collects the given flow with a provided [action].
  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
  *
  * Example of use:
+ *
  * ```
  * val flow = getMyEvents()
  * try {
@@ -35,7 +77,8 @@
 
 /**
  * Collects all the values from the given [flow] and emits them to the collector.
- * Shortcut for `flow.collect { value -> emit(value) }`.
+ * 
+ * It is a shorthand for `flow.collect { value -> emit(value) }`.
  */
 @ExperimentalCoroutinesApi
 public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) = flow.collect(this)
diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt
index cef74c1..50df19a 100644
--- a/kotlinx-coroutines-core/common/test/TestBase.common.kt
+++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt
@@ -46,7 +46,7 @@
 
 public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>) {
     try {
-        flow.collect { /* Do nothing */ }
+        flow.collect()
         fail("Should be unreached")
     } catch (e: Throwable) {
         assertTrue(e is T)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt
new file mode 100644
index 0000000..802ba1e
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class CatchTest : TestBase() {
+    @Test
+    fun testCatchEmit() = runTest {
+        val flow = flow {
+            emit(1)
+            throw TestException()
+        }
+
+        assertEquals(42, flow.catch { emit(41) }.sum())
+        assertFailsWith<TestException>(flow)
+    }
+
+    @Test
+    fun testCatchEmitExceptionFromDownstream() = runTest {
+        var executed = 0
+        val flow = flow {
+            emit(1)
+        }.catch { emit(42) }.map {
+            ++executed
+            throw TestException()
+        }
+
+        assertFailsWith<TestException>(flow)
+        assertEquals(1, executed)
+    }
+
+    @Test
+    fun testCatchEmitAll() = runTest {
+        val flow = flow {
+            emit(1)
+            throw TestException()
+        }.catch { emitAll(flowOf(2)) }
+
+        assertEquals(3, flow.sum())
+    }
+
+    @Test
+    fun testCatchEmitAllExceptionFromDownstream() = runTest {
+        var executed = 0
+        val flow = flow {
+            emit(1)
+        }.catch { emitAll(flowOf(1, 2, 3)) }.map {
+            ++executed
+            throw TestException()
+        }
+
+        assertFailsWith<TestException>(flow)
+        assertEquals(1, executed)
+    }
+
+    @Test
+    fun testWithTimeoutCatch() = runTest {
+        val flow = flow<Int> {
+            withTimeout(1) {
+                hang { expect(1) }
+            }
+            expectUnreached()
+        }.catch { emit(1) }
+
+        assertEquals(1, flow.single())
+        finish(2)
+    }
+
+    @Test
+    fun testCancellationFromUpstreamCatch() = runTest {
+        val flow = flow<Int> {
+            hang {  }
+        }.catch { expectUnreached() }
+
+        val job = launch {
+            expect(1)
+            flow.collect {  }
+        }
+
+        yield()
+        expect(2)
+        job.cancelAndJoin()
+        finish(3)
+    }
+
+    @Test
+    fun testCatchContext() = runTest {
+        expect(1)
+        val flow = flow {
+            expect(2)
+            emit("OK")
+            expect(3)
+            throw TestException()
+        }
+        val d0 = coroutineContext[ContinuationInterceptor] as CoroutineContext
+        val d1 = wrapperDispatcher(coroutineContext)
+        val d2 = wrapperDispatcher(coroutineContext)
+        flow
+            .catch { e ->
+                expect(4)
+                assertTrue(e is TestException)
+                assertEquals("A", kotlin.coroutines.coroutineContext[CoroutineName]?.name)
+                assertSame(d1, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
+                throw e // rethrow downstream
+            }
+            .flowOn(CoroutineName("A"))
+            .catch { e ->
+                expect(5)
+                assertTrue(e is TestException)
+                assertEquals("B", kotlin.coroutines.coroutineContext[CoroutineName]?.name)
+                assertSame(d1, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
+                throw e // rethrow downstream
+            }
+            .flowOn(CoroutineName("B"))
+            .catch { e ->
+                expect(6)
+                assertTrue(e is TestException)
+                assertSame(d1, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
+                throw e // rethrow downstream
+            }
+            .flowOn(d1)
+            .catch { e ->
+                expect(7)
+                assertTrue(e is TestException)
+                assertSame(d2, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
+                throw e // rethrow downstream
+            }
+            .flowOn(d2)
+            // flowOn with a different dispatcher introduces asynchrony so that all exceptions in the
+            // upstream flows are handled before they go downstream
+            .onEach { value ->
+                expect(8)
+                assertEquals("OK", value)
+            }
+            .catch { e ->
+                expect(9)
+                assertTrue(e is TestException)
+                assertSame(d0, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
+            }
+            .collect()
+        finish(10)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DropTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DropTest.kt
index 0c17c5d..1c5a305 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/DropTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/DropTest.kt
@@ -52,7 +52,7 @@
                 expect(4)
                 throw TestException()
                 42
-            }.onErrorReturn(42)
+            }.catch { emit(42) }
 
         expect(1)
         assertEquals(42, flow.single())
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FilterTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FilterTest.kt
index ee84d40..3de5d54 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FilterTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FilterTest.kt
@@ -39,7 +39,7 @@
             latch.receive()
             throw TestException()
             true
-        }.onErrorReturn(42)
+        }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
@@ -75,7 +75,7 @@
             latch.receive()
             throw TestException()
             true
-        }.onErrorReturn(42)
+        }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/MapNotNullTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/MapNotNullTest.kt
index f9a4e63..893811d 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/MapNotNullTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/MapNotNullTest.kt
@@ -43,7 +43,7 @@
             latch.receive()
             throw TestException()
             it + 1
-        }.onErrorReturn(42)
+        }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/MapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/MapTest.kt
index a7f1088..c744404 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/MapTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/MapTest.kt
@@ -38,11 +38,10 @@
                 }
                 emit(1)
             }
-        }.map {
+        }.onEach {
             latch.receive()
             throw TestException()
-            it + 1
-        }.onErrorReturn(42)
+        }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
new file mode 100644
index 0000000..ff29481
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class OnCompletionTest : TestBase() {
+
+    @Test
+    fun testOnCompletion() = runTest {
+        flow {
+            expect(1)
+            emit(2)
+            expect(4)
+        }.onEach {
+            expect(2)
+        }.onCompletion {
+            assertNull(it)
+            expect(5)
+        }.onEach {
+            expect(3)
+        }.collect()
+        finish(6)
+    }
+
+    @Test
+    fun testOnCompletionWithException() = runTest {
+        flowOf(1).onEach {
+            expect(1)
+            throw TestException()
+        }.onCompletion {
+            assertTrue(it is TestException)
+            expect(2)
+        }.catch {
+            assertTrue(it is TestException)
+            expect(3)
+        }.collect()
+        finish(4)
+    }
+
+    @Test
+    fun testOnCompletionWithExceptionDownstream() = runTest {
+        flow {
+            expect(1)
+            emit(2)
+        }.onEach {
+            expect(2)
+        }.onCompletion {
+            assertNull(it)
+            expect(4)
+        }.onEach {
+            expect(3)
+            throw TestException()
+        }.catch {
+            assertTrue(it is TestException)
+            expect(5)
+        }.collect()
+        finish(6)
+    }
+
+    @Test
+    fun testMultipleOnCompletions() = runTest {
+        flowOf(1).onCompletion {
+            assertNull(it)
+            expect(2)
+        }.onEach {
+            expect(1)
+            throw TestException()
+        }.onCompletion {
+            assertTrue(it is TestException)
+            expect(3)
+        }.catch {
+            assertTrue(it is TestException)
+            expect(4)
+        }.collect()
+        finish(5)
+    }
+
+    @Test
+    fun testExceptionFromOnCompletion() = runTest {
+        flowOf(1).onEach {
+            expect(1)
+            throw TestException()
+        }.onCompletion {
+            expect(2)
+            throw TestException2()
+        }.catch {
+            assertTrue(it is TestException2)
+            expect(3)
+        }.collect()
+        finish(4)
+    }
+
+    @Test
+    fun testContextPreservation() = runTest {
+        flowOf(1).onCompletion {
+            assertEquals("OK", NamedDispatchers.name())
+            assertNull(it)
+            expect(1)
+        }.flowOn(NamedDispatchers("OK"))
+            .onEach {
+                expect(2)
+                assertEquals("default", NamedDispatchers.nameOr("default"))
+                throw TestException()
+            }
+            .catch {
+                assertTrue(it is TestException)
+                expect(3)
+            }.collect()
+        finish(4)
+    }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnEachTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnEachTest.kt
index 93dde39..bad9db9 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/OnEachTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/OnEachTest.kt
@@ -43,7 +43,7 @@
             latch.receive()
             throw TestException()
             it + 1
-        }.onErrorReturn(42)
+        }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnErrorTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnErrorTest.kt
deleted file mode 100644
index 32378ab..0000000
--- a/kotlinx-coroutines-core/common/test/flow/operators/OnErrorTest.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class OnErrorTest : TestBase() {
-    @Test
-    fun testOnErrorReturn() = runTest {
-        val flow = flow {
-            emit(1)
-            throw TestException()
-        }
-
-        assertEquals(42, flow.onErrorReturn(41).sum())
-        assertFailsWith<TestException>(flow)
-    }
-
-    @Test
-    fun testOnErrorReturnPredicate() = runTest {
-        val flow = flow { emit(1); throw TestException() }
-        assertFailsWith<TestException>(flow.onErrorReturn(42) { it !is TestException })
-    }
-
-    @Test
-    fun testOnErrorReturnExceptionFromDownstream() = runTest {
-        var executed = 0
-        val flow = flow {
-            emit(1)
-        }.onErrorReturn(42).map {
-            ++executed
-            throw TestException()
-        }
-
-        assertFailsWith<TestException>(flow)
-        assertEquals(1, executed)
-    }
-
-    @Test
-    fun testOnErrorCollect() = runTest {
-        val flow = flow {
-            emit(1)
-            throw TestException()
-        }.onErrorCollect(flowOf(2))
-
-        assertEquals(3, flow.sum())
-    }
-
-    @Test
-    fun testOnErrorCollectPredicate() = runTest {
-        val flow = flow { emit(1); throw TestException() }
-        assertFailsWith<TestException>(flow.onErrorCollect(flowOf(2)) { it !is TestException })
-    }
-
-    @Test
-    fun testOnErrorCollectExceptionFromDownstream() = runTest {
-        var executed = 0
-        val flow = flow {
-            emit(1)
-        }.onErrorCollect(flowOf(1, 2, 3)).map {
-            ++executed
-            throw TestException()
-        }
-
-        assertFailsWith<TestException>(flow)
-        assertEquals(1, executed)
-    }
-
-    @Test
-    fun testWithTimeoutOnError() = runTest {
-        val flow = flow<Int> {
-            withTimeout(1) {
-                hang { expect(1) }
-            }
-            expectUnreached()
-        }.onErrorReturn(1)
-
-        assertEquals(1, flow.single())
-        finish(2)
-    }
-
-    @Test
-    fun testCancellationFromUpstreamOnError() = runTest {
-        val flow = flow<Int> {
-            hang {  }
-        }.onErrorCollect(flow { expectUnreached() })
-
-        val job = launch {
-            expect(1)
-            flow.collect {  }
-        }
-
-        yield()
-        expect(2)
-        job.cancelAndJoin()
-        finish(3)
-    }
-}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt
index 8752970..b8a6b19 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt
@@ -9,6 +9,25 @@
 
 class RetryTest : TestBase() {
     @Test
+    fun testRetryWhen() = runTest {
+        expect(1)
+        val flow = flow {
+            emit(1)
+            throw TestException()
+        }
+        val sum = flow.retryWhen { cause, attempt ->
+            assertTrue(cause is TestException)
+            expect(2 + attempt.toInt())
+            attempt < 3
+        }.catch { cause ->
+            expect(6)
+            assertTrue(cause is TestException)
+        }.sum()
+        assertEquals(4, sum)
+        finish(7)
+    }
+
+    @Test
     fun testRetry() = runTest {
         var counter = 0
         val flow = flow {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
index d739f1a..0108ea1 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
@@ -53,7 +53,7 @@
             expect(value) // 2
             latch.receive()
             throw TestException()
-        }.onErrorCollect(emptyFlow())
+        }.catch { /* ignore */ }
 
         assertEquals(1, flow.single())
         finish(4)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt
index a09f881..7110349 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt
@@ -70,7 +70,7 @@
             .map {
                 throw TestException()
                 42
-            }.onErrorReturn(42)
+            }.catch { emit(42) }
 
         assertEquals(42, flow.single())
         assertTrue(cancelled)
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TakeWhileTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TakeWhileTest.kt
index 63f1bff..c198356 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/TakeWhileTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/TakeWhileTest.kt
@@ -62,7 +62,6 @@
 
         assertFailsWith<TestException>(flow)
         assertTrue(cancelled)
-        assertEquals(42, flow.onErrorReturn(42).single())
-        assertEquals(42, flow.onErrorCollect(flowOf(42)).single())
+        assertEquals(42, flow.catch { emit(42) }.single())
     }
 }
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/LaunchInTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/LaunchInTest.kt
new file mode 100644
index 0000000..6b04b02
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/LaunchInTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class LaunchInTest : TestBase() {
+
+    @Test
+    fun testLaunchIn() = runTest {
+        val flow = flow {
+            expect(1)
+            emit(1)
+            throw TestException()
+        }.onEach {
+            assertEquals(1, it)
+            expect(2)
+        }.onCompletion {
+            assertTrue(it is TestException)
+            expect(3)
+        }.catch {
+            assertTrue { it is TestException }
+            expect(4)
+        }
+
+        flow.launchIn(this).join()
+        finish(5)
+    }
+
+    @Test
+    fun testDispatcher() = runTest {
+        flow {
+            assertEquals("flow", NamedDispatchers.name())
+            emit(1)
+            expect(1)
+        }.launchIn(this + NamedDispatchers("flow")).join()
+        finish(2)
+    }
+
+    @Test
+    fun testUnhandledError() = runTest(expected = { it is TestException }) {
+        flow {
+            emit(1)
+            expect(1)
+        }.catch {
+            expectUnreached()
+        }.onCompletion {
+            finish(2)
+            throw TestException()
+        }.launchIn(this)
+    }
+
+}
diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
index 6638ef6..0323c73 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
@@ -161,12 +161,12 @@
 
 private fun createStackTrace(continuation: CoroutineStackFrame): ArrayDeque<StackTraceElement> {
     val stack = ArrayDeque<StackTraceElement>()
-    continuation.getStackTraceElement()?.let { stack.add(sanitize(it)) }
+    continuation.getStackTraceElement()?.let { stack.add(it) }
 
     var last = continuation
     while (true) {
         last = (last as? CoroutineStackFrame)?.callerFrame ?: break
-        last.getStackTraceElement()?.let { stack.add(sanitize(it)) }
+        last.getStackTraceElement()?.let { stack.add(it) }
     }
     return stack
 }
@@ -175,18 +175,6 @@
  * @suppress
  */
 @InternalCoroutinesApi
-public fun sanitize(element: StackTraceElement): StackTraceElement {
-    if (!element.className.contains('/')) {
-        return element
-    }
-    // KT-28237: STE generated with debug metadata contains '/' as separators in FQN, while Java contains dots
-    return StackTraceElement(element.className.replace('/', '.'), element.methodName, element.fileName, element.lineNumber)
-}
-
-/**
- * @suppress
- */
-@InternalCoroutinesApi
 public fun artificialFrame(message: String) = java.lang.StackTraceElement("\b\b\b($message", "\b", "\b", -1)
 internal fun StackTraceElement.isArtificial() = className.startsWith("\b\b\b")
 private fun Array<StackTraceElement>.frameIndex(methodName: String) = indexOfFirst { methodName == it.className }
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
index 55ab150..bd1ba95 100644
--- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
@@ -115,7 +115,7 @@
     // fot tests only
     @Synchronized
     internal fun usePrivateScheduler() {
-        coroutineScheduler.shutdown(10_000L)
+        coroutineScheduler.shutdown(1_000L)
         coroutineScheduler = createScheduler()
     }
 
diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt
index 5c0117e..0a10913 100644
--- a/kotlinx-coroutines-core/jvm/test/TestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt
@@ -53,7 +53,7 @@
     private lateinit var threadsBefore: Set<Thread>
     private val uncaughtExceptions = Collections.synchronizedList(ArrayList<Throwable>())
     private var originalUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null
-    private val SHUTDOWN_TIMEOUT = 10_000L // 10s at most to wait
+    private val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread
 
     /**
      * Throws [IllegalStateException] like `error` in stdlib, but also ensures that the test will not
diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md
index ba7df50..6e4d6d5 100644
--- a/kotlinx-coroutines-debug/README.md
+++ b/kotlinx-coroutines-debug/README.md
@@ -18,7 +18,7 @@
 Add `kotlinx-coroutines-debug` to your project test dependencies:
 ```
 dependencies {
-    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-M1'
+    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.0-M2'
 }
 ```
 
@@ -57,7 +57,7 @@
 ### Using as JVM agent
 
 It is possible to use this module as a standalone JVM agent to enable debug probes on the application startup.
-You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.0-M1.jar`.
+You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.0-M2.jar`.
 Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines.
 
 
diff --git a/kotlinx-coroutines-debug/src/CoroutineInfo.kt b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
index 2d4b680..56f391a 100644
--- a/kotlinx-coroutines-debug/src/CoroutineInfo.kt
+++ b/kotlinx-coroutines-debug/src/CoroutineInfo.kt
@@ -7,7 +7,6 @@
 package kotlinx.coroutines.debug
 
 import kotlinx.coroutines.*
-import kotlinx.coroutines.internal.sanitize
 import kotlin.coroutines.*
 import kotlin.coroutines.jvm.internal.*
 
@@ -64,7 +63,7 @@
         var frame: CoroutineStackFrame? = lastObservedFrame ?: return emptyList()
         val result = ArrayList<StackTraceElement>()
         while (frame != null) {
-            frame.getStackTraceElement()?.let { result.add(sanitize(it)) }
+            frame.getStackTraceElement()?.let { result.add(it) }
             frame = frame.callerFrame
         }
         return result
diff --git a/kotlinx-coroutines-debug/test/DebugTestBase.kt b/kotlinx-coroutines-debug/test/DebugTestBase.kt
index 7ce2149..3e4abea 100644
--- a/kotlinx-coroutines-debug/test/DebugTestBase.kt
+++ b/kotlinx-coroutines-debug/test/DebugTestBase.kt
@@ -12,7 +12,7 @@
 
     @JvmField
     @Rule
-    val timeout = CoroutinesTimeout.seconds(10)
+    val timeout = CoroutinesTimeout.seconds(60)
 
     @Before
     open fun setUp() {
diff --git a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
index c990c3e..223a334 100644
--- a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
+++ b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt
@@ -9,6 +9,7 @@
 import kotlinx.coroutines.debug.*
 import kotlinx.coroutines.selects.*
 import org.junit.*
+import org.junit.Ignore
 import org.junit.Test
 import java.util.concurrent.*
 import kotlin.test.*
diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md
index 77ed6e2..a8bbafc 100644
--- a/kotlinx-coroutines-test/README.md
+++ b/kotlinx-coroutines-test/README.md
@@ -9,7 +9,7 @@
 Add `kotlinx-coroutines-test` to your project test dependencies:
 ```
 dependencies {
-    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-M1'
+    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.0-M2'
 }
 ```
 
diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md
index 26df27c..5942326 100644
--- a/reactive/coroutines-guide-reactive.md
+++ b/reactive/coroutines-guide-reactive.md
@@ -292,19 +292,20 @@
 1
 2
 3
-4
 OnComplete
 Finally
+4
 5
 ```
 
 <!--- TEST -->
 
-Notice how "OnComplete" and "Finally" are printed before the last element "5". It happens because our `main` function in this
+Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5". 
+It happens because our `main` function in this
 example is a coroutine that we start with the [runBlocking] coroutine builder.
 Our main coroutine receives on the flowable using the `source.collect { ... }` expression.
 The main coroutine is _suspended_ while it waits for the source to emit an item.
-When the last item is emitted by `Flowable.range(1, 5)` it
+When the last items are emitted by `Flowable.range(1, 5)` it
 _resumes_ the main coroutine, which gets dispatched onto the main thread to print this
  last element at a later point in time, while the source completes and prints "Finally".
 
diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
index d589a0d..6cf11b7 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt
@@ -17,11 +17,11 @@
  * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
  *           See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
  *
- * @param request how many items to request from publisher in advance (optional, on-demand request by default).
+ * @param request how many items to request from publisher in advance (optional, one by default).
  */
 @ObsoleteCoroutinesApi
 @Suppress("CONFLICTING_OVERLOADS")
-public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T> {
+public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T> {
     val channel = SubscriptionChannel<T>(request)
     subscribe(channel)
     return channel
@@ -40,7 +40,7 @@
 public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
     openSubscription().consumeEach(action)
 
-@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "SubscriberImplementation")
 private class SubscriptionChannel<T>(
     private val request: Int
 ) : LinkedListChannel<T>(), Subscriber<T> {
diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
index e2b23c9..50338de 100644
--- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt
@@ -7,66 +7,123 @@
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.flow.internal.*
+import kotlinx.coroutines.reactive.*
 import org.reactivestreams.*
+import kotlin.coroutines.*
 
 /**
  * Transforms the given reactive [Publisher] into [Flow].
- * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements
- * and [Subscription.request] size.
+ * Use [buffer] operator on the resulting flow to specify the size of the backpressure.
+ * More precisely, to it specifies the value of the subscription's [request][Subscription.request].
+ * `1` is used by default.
  *
  * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
  * are discarded.
  */
-@FlowPreview
-@JvmOverloads // For nice Java API
 @JvmName("from")
-public fun <T : Any> Publisher<T>.asFlow(batchSize: Int = 1): Flow<T> =
-    PublisherAsFlow(this, batchSize)
+@ExperimentalCoroutinesApi
+public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
+    PublisherAsFlow(this, 1)
 
-private class PublisherAsFlow<T : Any>(private val publisher: Publisher<T>, private val batchSize: Int) : Flow<T> {
+@FlowPreview
+@JvmName("from")
+@Deprecated(
+    message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
+    level = DeprecationLevel.ERROR,
+    replaceWith = ReplaceWith("asFlow().buffer(batchSize)", imports = ["kotlinx.coroutines.flow.*"])
+)
+public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)
+
+private class PublisherAsFlow<T : Any>(
+    private val publisher: Publisher<T>,
+    capacity: Int
+) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
+    override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
+        PublisherAsFlow(publisher, capacity)
+
+    override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
+        // use another channel for conflation (cannot do openSubscription)
+        if (capacity < 0) return super.produceImpl(scope)
+        // Open subscription channel directly
+        val channel = publisher.openSubscription(capacity)
+        val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
+            channel.cancel(cause?.let {
+                it as? CancellationException ?: CancellationException("Job was cancelled", it)
+            })
+        }
+        if (handle != null && handle !== NonDisposableHandle) {
+            (channel as SendChannel<*>).invokeOnClose {
+                handle.dispose()
+            }
+        }
+        return channel
+    }
+
+    private val requestSize: Long
+        get() = when (capacity) {
+            Channel.CONFLATED -> Long.MAX_VALUE // request all and conflate incoming
+            Channel.RENDEZVOUS -> 1L // need to request at least one anyway
+            Channel.UNLIMITED -> Long.MAX_VALUE // reactive streams way to say "give all" must be Long.MAX_VALUE
+            else -> capacity.toLong().also { check(it >= 1) }
+        }
 
     override suspend fun collect(collector: FlowCollector<T>) {
-        val channel = Channel<T>(batchSize)
-        val subscriber = ReactiveSubscriber(channel, batchSize)
+        val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
         publisher.subscribe(subscriber)
         try {
-            var consumed = 0
-            for (i in channel) {
-                collector.emit(i)
-                if (++consumed == batchSize) {
-                    consumed = 0
-                    subscriber.subscription.request(batchSize.toLong())
+            var consumed = 0L
+            while (true) {
+                val value = subscriber.takeNextOrNull() ?: break
+                collector.emit(value)
+                if (++consumed == requestSize) {
+                    consumed = 0L
+                    subscriber.makeRequest()
                 }
             }
         } finally {
-            subscriber.subscription.cancel()
+            subscriber.cancel()
         }
     }
 
-    @Suppress("SubscriberImplementation")
-    private class ReactiveSubscriber<T : Any>(
-        private val channel: Channel<T>,
-        private val batchSize: Int
-    ) : Subscriber<T> {
+    // The second channel here is used only for broadcast
+    override suspend fun collectTo(scope: ProducerScope<T>) =
+        collect(SendingCollector(scope.channel))
+}
 
-        lateinit var subscription: Subscription
+@Suppress("SubscriberImplementation")
+private class ReactiveSubscriber<T : Any>(
+    capacity: Int,
+    private val requestSize: Long
+) : Subscriber<T> {
+    private lateinit var subscription: Subscription
+    private val channel = Channel<T>(capacity)
 
-        override fun onComplete() {
-            channel.close()
-        }
+    suspend fun takeNextOrNull(): T? = channel.receiveOrNull()
 
-        override fun onSubscribe(s: Subscription) {
-            subscription = s
-            s.request(batchSize.toLong())
-        }
+    override fun onNext(value: T) {
+        // Controlled by requestSize
+        require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
+    }
 
-        override fun onNext(t: T) {
-            // Controlled by batch-size
-            require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
-        }
+    override fun onComplete() {
+        channel.close()
+    }
 
-        override fun onError(t: Throwable?) {
-            channel.close(t)
-        }
+    override fun onError(t: Throwable?) {
+        channel.close(t)
+    }
+
+    override fun onSubscribe(s: Subscription) {
+        subscription = s
+        makeRequest()
+    }
+
+    fun makeRequest() {
+        subscription.request(requestSize)
+    }
+
+    fun cancel() {
+        subscription.cancel()
     }
 }
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
index 6c3501d..74f5914 100644
--- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt
@@ -5,12 +5,12 @@
 package kotlinx.coroutines.reactive.flow
 
 import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
 import kotlinx.coroutines.flow.*
 import kotlinx.coroutines.reactive.*
 import kotlin.test.*
 
 class PublisherAsFlowTest : TestBase() {
-
     @Test
     fun testCancellation() = runTest {
         var onNext = 0
@@ -42,4 +42,108 @@
         assertEquals(1, onError)
         assertEquals(1, onCancelled)
     }
+
+    @Test
+    fun testBufferSize1() = runTest {
+        val publisher = publish {
+            expect(1)
+            send(3)
+
+            expect(2)
+            send(5)
+
+            expect(4)
+            send(7)
+            expect(6)
+        }
+
+        publisher.asFlow().collect {
+            expect(it)
+        }
+
+        finish(8)
+    }
+
+    @Test
+    fun testBufferSize10() = runTest {
+        val publisher = publish {
+            expect(1)
+            send(5)
+
+            expect(2)
+            send(6)
+
+            expect(3)
+            send(7)
+            expect(4)
+        }
+
+        publisher.asFlow().buffer(10).collect {
+            expect(it)
+        }
+
+        finish(8)
+    }
+
+    @Test
+    fun testConflated() = runTest {
+        val publisher = publish {
+            for (i in 1..5) send(i)
+        }
+        val list = publisher.asFlow().conflate().toList()
+        assertEquals(listOf(1, 5), list)
+    }
+
+    @Test
+    fun testProduce() = runTest {
+        val flow = publish { repeat(10) { send(it) } }.asFlow()
+        check((0..9).toList(), flow.produceIn(this))
+        check((0..9).toList(), flow.buffer(2).produceIn(this))
+        check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
+        check(listOf(0, 9), flow.conflate().produceIn(this))
+    }
+
+    private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
+        val result = ArrayList<Int>(10)
+        channel.consumeEach { result.add(it) }
+        assertEquals(expected, result)
+    }
+
+    @Test
+    fun testProduceCancellation() = runTest {
+        expect(1)
+        // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
+        val flow = publish {
+            expect(3)
+            repeat(10) { value ->
+                when (value) {
+                    in 0..6 -> send(value)
+                    7 -> try {
+                        send(value)
+                    } catch (e: CancellationException) {
+                        finish(6)
+                        throw e
+                    }
+                    else -> expectUnreached()
+                }
+            }
+        }.asFlow()
+        assertFailsWith<TestException> {
+            coroutineScope {
+                expect(2)
+                val channel = flow.produceIn(this)
+                channel.consumeEach { value ->
+                    when (value) {
+                        in 0..4 -> {}
+                        5 -> {
+                            expect(4)
+                            throw TestException()
+                        }
+                        else -> expectUnreached()
+                    }
+                }
+            }
+        }
+        expect(5)
+    }
 }
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
new file mode 100644
index 0000000..2ff96eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive.flow
+
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.example.unicast.*
+import org.reactivestreams.tck.*
+
+class RangePublisherBufferedTest :
+    PublisherVerification<Int>(TestEnvironment(50, 50))
+{
+    override fun createPublisher(elements: Long): Publisher<Int> {
+        return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher()
+    }
+
+    override fun createFailedPublisher(): Publisher<Int>? {
+        return null
+    }
+
+    @Ignore
+    override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
new file mode 100644
index 0000000..120cd72
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.reactive.flow.*
+import org.junit.Test
+import reactor.core.publisher.*
+import kotlin.test.*
+
+class BackpressureTest : TestBase() {
+    @Test
+    fun testBackpressureDropDirect() = runTest {
+        expect(1)
+        Flux.fromArray(arrayOf(1))
+            .onBackpressureDrop()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+
+    @Test
+    fun testBackpressureDropFlow() = runTest {
+        expect(1)
+        Flux.fromArray(arrayOf(1))
+            .onBackpressureDrop()
+            .asFlow()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
new file mode 100644
index 0000000..1904334
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import kotlinx.coroutines.reactive.flow.*
+import org.junit.Test
+import kotlin.test.*
+
+class BackpressureTest : TestBase() {
+    @Test
+    fun testBackpressureDropDirect() = runTest {
+        expect(1)
+        Flowable.fromArray(1)
+            .onBackpressureDrop()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+
+    @Test
+    fun testBackpressureDropFlow() = runTest {
+        expect(1)
+        Flowable.fromArray(1)
+            .onBackpressureDrop()
+            .asFlow()
+            .collect {
+                assertEquals(1, it)
+                expect(2)
+            }
+        finish(3)
+    }
+}
\ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt
index 65ac3e6..cebfc7b 100644
--- a/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt
@@ -52,9 +52,9 @@
             "1",
             "2",
             "3",
-            "4",
             "OnComplete",
             "Finally",
+            "4",
             "5"
         )
     }
diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md
index 7dcf907..d6e848b 100644
--- a/ui/coroutines-guide-ui.md
+++ b/ui/coroutines-guide-ui.md
@@ -165,7 +165,7 @@
 `app/build.gradle` file:
 
 ```groovy
-implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-M1"
+implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-M2"
 ```
 
 You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your 
diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
index ab28563..ed89e47 100644
--- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties
@@ -18,6 +18,6 @@
 
 kotlin.coroutines=enable
 
-kotlin_version=1.3.31
-coroutines_version=1.3.0-M1
+kotlin_version=1.3.40
+coroutines_version=1.3.0-M2
 
diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties
index ab28563..ed89e47 100644
--- a/ui/kotlinx-coroutines-android/example-app/gradle.properties
+++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties
@@ -18,6 +18,6 @@
 
 kotlin.coroutines=enable
 
-kotlin_version=1.3.31
-coroutines_version=1.3.0-M1
+kotlin_version=1.3.40
+coroutines_version=1.3.0-M2