Review and optimize usage of CancellableContinuation.invokeOnCancellation
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
index 218c3ee..26a59ce 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
@@ -74,12 +74,17 @@
             it.start() // To properly await lazily started deferreds
             it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)
         }
-        cont.invokeOnCancellation {
-            handlers.forEach { it.dispose() }
-        }
+        cont.invokeOnCancellation(handler = DisposeHandlersOnCancel(handlers).asHandler)
     }
 
-    inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
+    private class DisposeHandlersOnCancel(private val handlers: List<DisposableHandle>) : CancelHandler() {
+        override fun invoke(cause: Throwable?) {
+            handlers.forEach { it.dispose() }
+        }
+        override fun toString(): String = "DisposeHandlersOnCancel[$handlers]"
+    }
+
+    private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
         override fun invoke(cause: Throwable?) {
             if (cause != null) {
                 val token = continuation.tryResumeWithException(cause)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index a5889c4..bb8ce69 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -769,11 +769,15 @@
 
     // ------ private ------
 
-    private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
-        cont.invokeOnCancellation {
+    private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
+        cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
+
+    private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
+        override fun invoke(cause: Throwable?) {
             if (receive.remove())
                 onReceiveDequeued()
         }
+        override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
     }
 
     private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
diff --git a/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt b/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
index fdcdee3..1d002ac 100644
--- a/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
+++ b/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
@@ -137,12 +137,12 @@
 
 private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
     cont.invokeOnCancellation {
-            try {
-                close()
-            } catch (ex: Throwable) {
-                // Specification says that it is Ok to call it any time, but reality is different,
-                // so we have just to ignore exception
-            }
+        try {
+            close()
+        } catch (ex: Throwable) {
+            // Specification says that it is Ok to call it any time, but reality is different,
+            // so we have just to ignore exception
+        }
     }
 }
 
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
index 926ddaa..f9f1f62 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
@@ -28,16 +28,18 @@
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
         val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit))
         // Actually on cancellation, but clearTimeout is idempotent
-        continuation.invokeOnCancellation { clearTimeout(handle) }
+        continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
+    }
+
+    private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
+        override fun dispose() { clearTimeout(handle) }
+        override fun invoke(cause: Throwable?) { dispose() }
+        override fun toString(): String = "ClearTimeout[$handle]"
     }
 
     override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
         val handle = setTimeout({ block.run() }, time.toIntMillis(unit))
-        return object : DisposableHandle {
-            override fun dispose() {
-                clearTimeout(handle)
-            }
-        }
+        return ClearTimeout(handle)
     }
 }
 
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
index f283ba0..0feef61 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
@@ -120,15 +120,15 @@
                 Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
                     if (!seenValue) {
                         seenValue = true
-                        cont.resume(t)
                         subscription.cancel()
+                        cont.resume(t)
                     }
                 }
                 Mode.LAST, Mode.SINGLE -> {
                     if (mode == Mode.SINGLE && seenValue) {
+                        subscription.cancel()
                         if (cont.isActive)
                             cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
-                        subscription.cancel()
                     } else {
                         value = t
                         seenValue = true
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
index 0a567a7..9838fda 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
@@ -33,7 +33,7 @@
  */
 public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
     subscribe(object : CompletableSubscriber {
-        override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCompletion(s) }
+        override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
         override fun onCompleted() { cont.resume(Unit) }
         override fun onError(e: Throwable) { cont.resumeWithException(e) }
     })
@@ -50,7 +50,7 @@
  * immediately resumes with [CancellationException].
  */
 public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
-    cont.unsubscribeOnCompletion(subscribe(object : SingleSubscriber<T>() {
+    cont.unsubscribeOnCancellation(subscribe(object : SingleSubscriber<T>() {
         override fun onSuccess(t: T) { cont.resume(t) }
         override fun onError(error: Throwable) { cont.resumeWithException(error) }
     }))
@@ -128,7 +128,7 @@
 // ------------------------ private ------------------------
 
 private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
-    cont.unsubscribeOnCompletion(subscribe(object : Subscriber<T>() {
+    cont.unsubscribeOnCancellation(subscribe(object : Subscriber<T>() {
         override fun onStart() { request(1) }
         override fun onNext(t: T) { cont.resume(t) }
         override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
@@ -136,6 +136,5 @@
     }))
 }
 
-internal fun <T> CancellableContinuation<T>.unsubscribeOnCompletion(sub: Subscription) {
+internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
     invokeOnCancellation { sub.unsubscribe() }
-}
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
index 5cf6936..1a4d2cd 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
@@ -46,7 +46,7 @@
                     with(continuation) { resumeUndispatched(Unit) }
                  }, time, unit)
                 .let { subscription ->
-                    continuation.unsubscribeOnCompletion(subscription)
+                    continuation.unsubscribeOnCancellation(subscription)
                 }
 
     override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
index d0a5167..29e265a 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
@@ -34,7 +34,7 @@
  */
 public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
     subscribe(object : CompletableObserver {
-        override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
         override fun onComplete() { cont.resume(Unit) }
         override fun onError(e: Throwable) { cont.resumeWithException(e) }
     })
@@ -65,7 +65,7 @@
  */
 public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
     subscribe(object : MaybeObserver<T> {
-        override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
         override fun onComplete() { cont.resume(default) }
         override fun onSuccess(t: T) { cont.resume(t) }
         override fun onError(error: Throwable) { cont.resumeWithException(error) }
@@ -84,7 +84,7 @@
  */
 public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
     subscribe(object : SingleObserver<T> {
-        override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+        override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
         override fun onSuccess(t: T) { cont.resume(t) }
         override fun onError(error: Throwable) { cont.resumeWithException(error) }
     })
@@ -161,7 +161,7 @@
 
 // ------------------------ private ------------------------
 
-internal fun CancellableContinuation<*>.disposeOnCompletion(d: Disposable) =
+internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
     invokeOnCancellation { d.dispose() }
 
 private enum class Mode(val s: String) {
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
index 5d5e754..46e0c11 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
@@ -43,7 +43,7 @@
         val disposable = scheduler.scheduleDirect({
             with(continuation) { resumeUndispatched(Unit) }
         }, time, unit)
-        continuation.disposeOnCompletion(disposable)
+        continuation.disposeOnCancellation(disposable)
     }
 
     override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {