Improve ReceiveChannel operators implementations to guarantee closing
of the source channels under all circumstances;
`onCompletion` added to `produce` builder;
`ReceiveChannel.consumes(): CompletionHandler` extension fun.

Fixes #279
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
index 61e6a6f..1565ecd 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt
@@ -70,17 +70,20 @@
  * @param context context of the coroutine. The default value is [DefaultDispatcher].
  * @param capacity capacity of the channel's buffer (no buffer by default).
  * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
+ * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
  * @param block the coroutine code.
  */
 public fun <E> produce(
     context: CoroutineContext = DefaultDispatcher,
     capacity: Int = 0,
     parent: Job? = null,
+    onCompletion: CompletionHandler? = null,
     block: suspend ProducerScope<E>.() -> Unit
 ): ReceiveChannel<E> {
     val channel = Channel<E>(capacity)
     val newContext = newCoroutineContext(context, parent)
     val coroutine = ProducerCoroutine(newContext, channel)
+    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
     return coroutine
 }
@@ -90,6 +93,15 @@
 public fun <E> produce(
     context: CoroutineContext = DefaultDispatcher,
     capacity: Int = 0,
+    parent: Job? = null,
+    block: suspend ProducerScope<E>.() -> Unit
+): ReceiveChannel<E> = produce(context, capacity, parent, block = block)
+
+/** @suppress **Deprecated**: Binary compatibility */
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> produce(
+    context: CoroutineContext = DefaultDispatcher,
+    capacity: Int = 0,
     block: suspend ProducerScope<E>.() -> Unit
 ): ProducerJob<E> =
     produce(context, capacity, block = block) as ProducerJob<E>