Fixed Publisher/Observable/Flowable.openSubscription in presence of selects;
support an optional `request` parameter in openSubscription to specify
how many elements are requested from publisher in advance on subscription.

Fixes #197
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
index 1a6b7c0..ad87de9 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxChannel.kt
@@ -27,9 +27,11 @@
 /**
  * Subscribes to this [Observable] and returns a channel to receive elements emitted by it.
  * The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this observable.
+ * @param request how many items to request from publisher in advance (optional, on-demand request by default).
  */
-public fun <T> Observable<T>.openSubscription(): SubscriptionReceiveChannel<T> {
-    val channel = SubscriptionChannel<T>()
+@JvmOverloads // for binary compatibility
+public fun <T> Observable<T>.openSubscription(request: Int = 0): SubscriptionReceiveChannel<T> {
+    val channel = SubscriptionChannel<T>(request)
     val subscription = subscribe(channel.subscriber)
     channel.subscription = subscription
     if (channel.isClosedForSend) subscription.unsubscribe()
@@ -59,7 +61,7 @@
 /**
  * Subscribes to this [Observable] and performs the specified action for each received element.
  */
-public inline suspend fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
+public suspend inline fun <T> Observable<T>.consumeEach(action: (T) -> Unit) {
     openSubscription().use { channel ->
         for (x in channel) action(x)
     }
@@ -72,45 +74,58 @@
 public suspend fun <T> Observable<T>.consumeEach(action: suspend (T) -> Unit) =
     consumeEach { action(it) }
 
-private class SubscriptionChannel<T> : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
+private class SubscriptionChannel<T>(
+    private val request: Int
+) : LinkedListChannel<T>(), SubscriptionReceiveChannel<T> {
+    init {
+        require(request >= 0) { "Invalid request size: $request" }
+    }
+
     @JvmField
-    val subscriber: ChannelSubscriber = ChannelSubscriber()
+    val subscriber: ChannelSubscriber = ChannelSubscriber(request)
 
     @Volatile
     @JvmField
     var subscription: Subscription? = null
 
-    val _balance = atomic(0) // request balance from cancelled receivers
+    // requested from subscription minus number of received minus number of enqueued receivers,
+    private val _requested = atomic(request)
 
     // AbstractChannel overrides
-    override fun onEnqueuedReceive() {
-        _balance.loop { balance ->
-            if (balance == 0) {
-                subscriber.requestOne()
+    override fun onReceiveEnqueued() {
+        _requested.loop { wasRequested ->
+            val needRequested = wasRequested - 1
+            if (needRequested < 0) { // need to request more from subscriber
+                // try to fixup by making request
+                if (wasRequested != request && !_requested.compareAndSet(wasRequested, request))
+                    return@loop // continue looping if failed
+                subscriber.makeRequest((request - needRequested).toLong())
                 return
             }
-            if (_balance.compareAndSet(balance, balance - 1)) return
+            // just do book-keeping
+            if (_requested.compareAndSet(wasRequested, needRequested)) return
         }
     }
 
-    override fun onCancelledReceive() {
-        _balance.incrementAndGet()
+    override fun onReceiveDequeued() {
+        _requested.incrementAndGet()
     }
 
     override fun afterClose(cause: Throwable?) {
         subscription?.unsubscribe()
     }
 
-    inner class ChannelSubscriber: Subscriber<T>() {
-        fun requestOne() {
-            request(1)
+    inner class ChannelSubscriber(private val request: Int): Subscriber<T>() {
+        fun makeRequest(n: Long) {
+            request(n)
         }
 
         override fun onStart() {
-            request(0) // init backpressure, but don't request anything yet
+            request(request.toLong()) // init backpressure
         }
 
         override fun onNext(t: T) {
+            _requested.decrementAndGet()
             offer(t)
         }