Deferred.getCompleted() and an efficient implementation of Deferred.toCompletableFuture() extension
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index f0fe557..27cb49e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -59,7 +59,7 @@
val parentContext: CoroutineContext
) : JobContinuation<Unit>(parentContext) {
override fun afterCompletion(state: Any?, closeException: Throwable?) {
- if (closeException != null) handleCoroutineException(context, closeException)
+ super.afterCompletion(state, closeException) // handle closeException
// note the use of the parent context below!
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
}
@@ -69,7 +69,7 @@
val blockedThread: Thread = Thread.currentThread()
override fun afterCompletion(state: Any?, closeException: Throwable?) {
- if (closeException != null) handleCoroutineException(context, closeException)
+ super.afterCompletion(state, closeException) // handle closeException
LockSupport.unpark(blockedThread)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 397d590..25a9eb3 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -47,7 +47,7 @@
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?, closeException: Throwable?) {
- if (closeException != null) handleCoroutineException(context, closeException)
+ super.afterCompletion(state, closeException) // handle closeException
if (suspendedThread === Thread.currentThread()) {
// cancelled during suspendCancellableCoroutine in its thread
suspendedThread = null
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 4c0c5e3..0b36030 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -12,9 +12,17 @@
* Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException] .
+ * immediately resumes with [CancellationException].
*/
public suspend fun await(): T
+
+ /**
+ * Returns *completed* result or throws [IllegalStateException] if this deferred value is still [isActive].
+ * It throws the corresponding exception if this deferred has completed exceptionally.
+ * This function is designed to be used from [onCompletion] handlers, when there is an absolute certainty that
+ * the value is already complete.
+ */
+ public fun getCompleted(): T
}
/**
@@ -54,7 +62,11 @@
})
}
- override fun afterCompletion(state: Any?, closeException: Throwable?) {
- if (closeException != null) handleCoroutineException(context, closeException)
+ @Suppress("UNCHECKED_CAST")
+ override fun getCompleted(): T {
+ val state = getState()
+ check(state !is Active) { "This deferred value is still active" }
+ if (state is CompletedExceptionally) throw state.exception
+ return state as T
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt
index 497de91..1ef250d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt
@@ -35,4 +35,8 @@
}
}
}
+
+ override fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) handleCoroutineException(context, closeException)
+ }
}
diff --git a/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt b/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
index 8d27647..b964425 100644
--- a/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
+++ b/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
@@ -35,9 +35,8 @@
val future = CompletableFuture<T>()
future.whenComplete { _, exception -> cancel(exception) }
onCompletion {
- // todo: write better (more efficient) implementation, because we know that await will not suspend
try {
- future.complete(runBlocking(Job() + Here) { await() })
+ future.complete(getCompleted())
} catch (exception: Exception) {
future.completeExceptionally(exception)
}