Decouple asFlow from batchSize and move it to buffer instead, promoteā€¦ (#1279)

* Optimize Publisher.asFlow and fix conflation
* Use channel.receiveOrNull instead of for loop iteration
  (it is more efficient)
* Calling Publisher.asFlow().produceIn(...) uses a single channel
  and is implemented via Publisher.openSubscription()
diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
new file mode 100644
index 0000000..2ff96eb
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactive.flow
+
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import org.reactivestreams.*
+import org.reactivestreams.example.unicast.*
+import org.reactivestreams.tck.*
+
+class RangePublisherBufferedTest :
+    PublisherVerification<Int>(TestEnvironment(50, 50))
+{
+    override fun createPublisher(elements: Long): Publisher<Int> {
+        return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher()
+    }
+
+    override fun createFailedPublisher(): Publisher<Int>? {
+        return null
+    }
+
+    @Ignore
+    override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
+    }
+
+    @Ignore
+    override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
+    }
+}
\ No newline at end of file