blob: b67d8605cf4039b2d944dbc13b40d122ea16518c [file] [log] [blame]
Roman Elizarovf106ff32017-05-17 12:22:14 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.guava
18
Roman Elizarova4b56932018-03-13 17:59:48 +030019import com.google.common.util.concurrent.*
Roman Elizarovf106ff32017-05-17 12:22:14 +030020import kotlinx.coroutines.experimental.*
Vsevolod Tolstopyatov20dbd9f2018-05-07 20:08:20 +030021import java.util.concurrent.*
Roman Elizarova4b56932018-03-13 17:59:48 +030022import kotlin.coroutines.experimental.*
Roman Elizarovf106ff32017-05-17 12:22:14 +030023
24/**
25 * Starts new coroutine and returns its results an an implementation of [ListenableFuture].
26 * This coroutine builder uses [CommonPool] context by default.
27 *
28 * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030029 *
30 * The [context] for the new coroutine can be explicitly specified.
31 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarovc7d10a42018-03-13 18:28:42 +030032 * The [coroutineContext] of the parent coroutine may be used,
Roman Elizarovf106ff32017-05-17 12:22:14 +030033 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
Roman Elizarove8f694e2017-11-28 10:12:00 +030034 * The parent job may be also explicitly specified using [parent] parameter.
35 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030036 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
Roman Elizarovf106ff32017-05-17 12:22:14 +030037 *
38 * By default, the coroutine is immediately scheduled for execution.
39 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
40 * A value of [CoroutineStart.LAZY] is not supported
41 * (since `ListenableFuture` framework does not provide the corresponding capability) and
42 * produces [IllegalArgumentException].
43 *
44 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
45 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030046 * @param context context of the coroutine. The default value is [DefaultDispatcher].
47 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
Roman Elizarove8f694e2017-11-28 10:12:00 +030048 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
Roman Elizarova4b56932018-03-13 17:59:48 +030049 * @param onCompletion optional completion handler for the coroutine (see [Job.invokeOnCompletion]).
Roman Elizarovc0d559b2017-09-28 14:27:05 +030050 * @param block the coroutine code.
Roman Elizarovf106ff32017-05-17 12:22:14 +030051 */
52public fun <T> future(
Roman Elizarovc0d559b2017-09-28 14:27:05 +030053 context: CoroutineContext = DefaultDispatcher,
Roman Elizarovf106ff32017-05-17 12:22:14 +030054 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030055 parent: Job? = null,
Roman Elizarova4b56932018-03-13 17:59:48 +030056 onCompletion: CompletionHandler? = null,
Roman Elizarovf106ff32017-05-17 12:22:14 +030057 block: suspend CoroutineScope.() -> T
58): ListenableFuture<T> {
59 require(!start.isLazy) { "$start start is not supported" }
Roman Elizarove8f694e2017-11-28 10:12:00 +030060 val newContext = newCoroutineContext(context, parent)
Roman Elizarovf106ff32017-05-17 12:22:14 +030061 val job = Job(newContext[Job])
62 val future = ListenableFutureCoroutine<T>(newContext + job)
63 job.cancelFutureOnCompletion(future)
Roman Elizarova4b56932018-03-13 17:59:48 +030064 if (onCompletion != null) job.invokeOnCompletion(handler = onCompletion)
Roman Elizarovf106ff32017-05-17 12:22:14 +030065 start(block, receiver=future, completion=future) // use the specified start strategy
66 return future
67}
68
Roman Elizarove8f694e2017-11-28 10:12:00 +030069/** @suppress **Deprecated**: Binary compatibility */
70@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
71public fun <T> future(
72 context: CoroutineContext = DefaultDispatcher,
73 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarova4b56932018-03-13 17:59:48 +030074 parent: Job? = null,
75 block: suspend CoroutineScope.() -> T
76): ListenableFuture<T> = future(context, start, parent, block = block)
77
78/** @suppress **Deprecated**: Binary compatibility */
79@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
80public fun <T> future(
81 context: CoroutineContext = DefaultDispatcher,
82 start: CoroutineStart = CoroutineStart.DEFAULT,
Roman Elizarove8f694e2017-11-28 10:12:00 +030083 block: suspend CoroutineScope.() -> T
84): ListenableFuture<T> =
85 future(context, start, block = block)
86
Roman Elizarovf106ff32017-05-17 12:22:14 +030087private class ListenableFutureCoroutine<T>(
88 override val context: CoroutineContext
89) : AbstractFuture<T>(), Continuation<T>, CoroutineScope {
Roman Elizarov43e3af72017-07-21 16:01:31 +030090 override val coroutineContext: CoroutineContext get() = context
Roman Elizarovf106ff32017-05-17 12:22:14 +030091 override val isActive: Boolean get() = context[Job]!!.isActive
92 override fun resume(value: T) { set(value) }
93 override fun resumeWithException(exception: Throwable) { setException(exception) }
94 override fun interruptTask() { context[Job]!!.cancel() }
95}
96
97/**
98 * Converts this deferred value to the instance of [ListenableFuture].
99 * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
100 */
101public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> = DeferredListenableFuture<T>(this)
102
103private class DeferredListenableFuture<T>(
104 private val deferred: Deferred<T>
105) : AbstractFuture<T>() {
106 init {
107 deferred.invokeOnCompletion {
108 try {
109 set(deferred.getCompleted())
110 } catch (exception: Exception) {
111 setException(exception)
112 }
113 }
114 }
115 override fun interruptTask() { deferred.cancel() }
116}
117
118/**
119 * Awaits for completion of the future without blocking a thread.
120 *
121 * This suspending function is cancellable.
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300122 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
Roman Elizarovf106ff32017-05-17 12:22:14 +0300123 * stops waiting for the future and immediately resumes with [CancellationException].
124 *
125 * Note, that `ListenableFuture` does not support removal of installed listeners, so on cancellation of this wait
126 * a few small objects will remain in the `ListenableFuture` list of listeners until the future completes. However, the
127 * care is taken to clear the reference to the waiting coroutine itself, so that its memory can be released even if
128 * the future never completes.
129 */
Vsevolod Tolstopyatov20dbd9f2018-05-07 20:08:20 +0300130public suspend fun <T> ListenableFuture<T>.await(): T {
131 try {
132 if (isDone) return get() as T
133 } catch (e: ExecutionException) {
134 throw e.cause ?: e // unwrap original cause from ExecutionException
135 }
136
137 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
138 val callback = ContinuationCallback(cont)
139 Futures.addCallback(this, callback, MoreExecutors.directExecutor())
140 cont.invokeOnCancellation {
141 callback.cont = null // clear the reference to continuation from the future's callback
142 }
Roman Elizarovf106ff32017-05-17 12:22:14 +0300143 }
144}
145
146private class ContinuationCallback<T>(
147 @Volatile @JvmField var cont: Continuation<T>?
148) : FutureCallback<T> {
Roman Elizarov19bf4d52017-07-11 14:43:28 +0300149 @Suppress("UNCHECKED_CAST")
Roman Elizarovf106ff32017-05-17 12:22:14 +0300150 override fun onSuccess(result: T?) { cont?.resume(result as T) }
151 override fun onFailure(t: Throwable) { cont?.resumeWithException(t) }
152}