Review and optimize usage of CancellableContinuation.invokeOnCancellation
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
index 218c3ee..26a59ce 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Await.kt
@@ -74,12 +74,17 @@
it.start() // To properly await lazily started deferreds
it.invokeOnCompletion(AwaitAllNode(cont, it).asHandler)
}
- cont.invokeOnCancellation {
- handlers.forEach { it.dispose() }
- }
+ cont.invokeOnCancellation(handler = DisposeHandlersOnCancel(handlers).asHandler)
}
- inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
+ private class DisposeHandlersOnCancel(private val handlers: List<DisposableHandle>) : CancelHandler() {
+ override fun invoke(cause: Throwable?) {
+ handlers.forEach { it.dispose() }
+ }
+ override fun toString(): String = "DisposeHandlersOnCancel[$handlers]"
+ }
+
+ private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
override fun invoke(cause: Throwable?) {
if (cause != null) {
val token = continuation.tryResumeWithException(cause)
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index a5889c4..bb8ce69 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -769,11 +769,15 @@
// ------ private ------
- private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) {
- cont.invokeOnCancellation {
+ private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
+ cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
+
+ private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
+ override fun invoke(cause: Throwable?) {
if (receive.remove())
onReceiveDequeued()
}
+ override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
}
private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
diff --git a/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt b/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
index fdcdee3..1d002ac 100644
--- a/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
+++ b/integration/kotlinx-coroutines-nio/src/main/kotlin/kotlinx/coroutines/experimental/nio/Nio.kt
@@ -137,12 +137,12 @@
private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
cont.invokeOnCancellation {
- try {
- close()
- } catch (ex: Throwable) {
- // Specification says that it is Ok to call it any time, but reality is different,
- // so we have just to ignore exception
- }
+ try {
+ close()
+ } catch (ex: Throwable) {
+ // Specification says that it is Ok to call it any time, but reality is different,
+ // so we have just to ignore exception
+ }
}
}
diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
index 926ddaa..f9f1f62 100644
--- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
+++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/JSDispatcher.kt
@@ -28,16 +28,18 @@
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toIntMillis(unit))
// Actually on cancellation, but clearTimeout is idempotent
- continuation.invokeOnCancellation { clearTimeout(handle) }
+ continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
+ }
+
+ private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
+ override fun dispose() { clearTimeout(handle) }
+ override fun invoke(cause: Throwable?) { dispose() }
+ override fun toString(): String = "ClearTimeout[$handle]"
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
val handle = setTimeout({ block.run() }, time.toIntMillis(unit))
- return object : DisposableHandle {
- override fun dispose() {
- clearTimeout(handle)
- }
- }
+ return ClearTimeout(handle)
}
}
diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
index f283ba0..0feef61 100644
--- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Await.kt
@@ -120,15 +120,15 @@
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
seenValue = true
- cont.resume(t)
subscription.cancel()
+ cont.resume(t)
}
}
Mode.LAST, Mode.SINGLE -> {
if (mode == Mode.SINGLE && seenValue) {
+ subscription.cancel()
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
- subscription.cancel()
} else {
value = t
seenValue = true
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
index 0a567a7..9838fda 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxAwait.kt
@@ -33,7 +33,7 @@
*/
public suspend fun Completable.awaitCompleted(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableSubscriber {
- override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCompletion(s) }
+ override fun onSubscribe(s: Subscription) { cont.unsubscribeOnCancellation(s) }
override fun onCompleted() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
})
@@ -50,7 +50,7 @@
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
- cont.unsubscribeOnCompletion(subscribe(object : SingleSubscriber<T>() {
+ cont.unsubscribeOnCancellation(subscribe(object : SingleSubscriber<T>() {
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
}))
@@ -128,7 +128,7 @@
// ------------------------ private ------------------------
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
- cont.unsubscribeOnCompletion(subscribe(object : Subscriber<T>() {
+ cont.unsubscribeOnCancellation(subscribe(object : Subscriber<T>() {
override fun onStart() { request(1) }
override fun onNext(t: T) { cont.resume(t) }
override fun onCompleted() { if (cont.isActive) cont.resumeWithException(IllegalStateException("Should have invoked onNext")) }
@@ -136,6 +136,5 @@
}))
}
-internal fun <T> CancellableContinuation<T>.unsubscribeOnCompletion(sub: Subscription) {
+internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
invokeOnCancellation { sub.unsubscribe() }
-}
diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
index 5cf6936..1a4d2cd 100644
--- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
+++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxScheduler.kt
@@ -46,7 +46,7 @@
with(continuation) { resumeUndispatched(Unit) }
}, time, unit)
.let { subscription ->
- continuation.unsubscribeOnCompletion(subscription)
+ continuation.unsubscribeOnCancellation(subscription)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
index d0a5167..29e265a 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxAwait.kt
@@ -34,7 +34,7 @@
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(Unit) }
override fun onError(e: Throwable) { cont.resumeWithException(e) }
})
@@ -65,7 +65,7 @@
*/
public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
subscribe(object : MaybeObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onComplete() { cont.resume(default) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
@@ -84,7 +84,7 @@
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCompletion(d) }
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
override fun onSuccess(t: T) { cont.resume(t) }
override fun onError(error: Throwable) { cont.resumeWithException(error) }
})
@@ -161,7 +161,7 @@
// ------------------------ private ------------------------
-internal fun CancellableContinuation<*>.disposeOnCompletion(d: Disposable) =
+internal fun CancellableContinuation<*>.disposeOnCancellation(d: Disposable) =
invokeOnCancellation { d.dispose() }
private enum class Mode(val s: String) {
diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
index 5d5e754..46e0c11 100644
--- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxScheduler.kt
@@ -43,7 +43,7 @@
val disposable = scheduler.scheduleDirect({
with(continuation) { resumeUndispatched(Unit) }
}, time, unit)
- continuation.disposeOnCompletion(disposable)
+ continuation.disposeOnCancellation(disposable)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {