blob: 96ad84de3406bcfeeb6fa9e02c1b3df58c7d89c6 [file] [log] [blame]
Roman Elizarova7db8ec2017-12-21 22:45:12 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov65eff0b2017-12-20 15:51:31 +030017package kotlinx.coroutines.experimental
18
Roman Elizarov19f48452017-12-21 18:48:20 +030019import kotlin.coroutines.experimental.Continuation
20import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
21import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
22import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
23
24/**
25 * Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
26 * [TimeoutCancellationException] if timeout was exceeded.
27 *
28 * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
29 * cancellable suspending function inside the block throws [TimeoutCancellationException].
30 * Even if the code in the block suppresses [TimeoutCancellationException], it
31 * is still thrown by `withTimeout` invocation.
32 *
33 * The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
34 *
35 * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
36 * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
37 *
38 * @param time timeout time in milliseconds.
39 */
40public actual suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T {
41 require(time >= 0) { "Timeout time $time cannot be negative" }
42 if (time <= 0L) throw CancellationException("Timed out immediately")
43 return suspendCoroutineOrReturn { cont: Continuation<T> ->
44 setupTimeout(TimeoutCoroutine(time, cont), block)
45 }
46}
47
48private fun <U, T: U> setupTimeout(
49 coroutine: TimeoutCoroutine<U, T>,
50 block: suspend CoroutineScope.() -> T
51): Any? {
52 // schedule cancellation of this coroutine on time
53 val cont = coroutine.cont
54 val context = cont.context
55 coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
56 coroutine.initParentJob(context[Job])
57 // restart block using new coroutine with new job,
58 // however start it as undispatched coroutine, because we are already in the proper context
59 val result = try {
60 block.startCoroutineUninterceptedOrReturn(receiver = coroutine, completion = coroutine)
61 } catch (e: Throwable) {
Roman Elizarovf29203c2018-01-11 12:39:36 +030062 CompletedExceptionally(e)
Roman Elizarov19f48452017-12-21 18:48:20 +030063 }
64 return when {
65 result == COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
Roman Elizarov4d626de2018-01-11 22:57:28 +030066 coroutine.makeCompletingOnce(result, MODE_IGNORE) -> {
Roman Elizarovf29203c2018-01-11 12:39:36 +030067 if (result is CompletedExceptionally) throw result.exception else result
Roman Elizarov19f48452017-12-21 18:48:20 +030068 }
69 else -> COROUTINE_SUSPENDED
70 }
71}
72
73private open class TimeoutCoroutine<U, in T: U>(
74 val time: Int,
75 val cont: Continuation<U>
76) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
77 override val defaultResumeMode: Int get() = MODE_DIRECT
78
79 @Suppress("LeakingThis")
80 override fun run() {
81 cancel(TimeoutCancellationException(time, this))
82 }
83
84 @Suppress("UNCHECKED_CAST")
85 override fun afterCompletion(state: Any?, mode: Int) {
86 if (state is CompletedExceptionally)
87 cont.resumeWithExceptionMode(state.exception, mode)
88 else
89 cont.resumeMode(state as T, mode)
90 }
91
92 override fun toString(): String =
93 "TimeoutCoroutine($time)"
94}
95
96/**
97 * Runs a given suspending block of code inside a coroutine with a specified timeout and returns
98 * `null` if this timeout was exceeded.
99 *
100 * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
101 * cancellable suspending function inside the block throws [TimeoutCancellationException].
102 * Even if the code in the block suppresses [TimeoutCancellationException], this
103 * invocation of `withTimeoutOrNull` still returns `null`.
104 *
105 * The sibling function that throws exception on timeout is [withTimeout].
106 *
107 * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
108 * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
109 *
110 * @param time timeout time in milliseconds.
111 */
112public actual suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? {
113 require(time >= 0) { "Timeout time $time cannot be negative" }
114 if (time <= 0L) return null
115 return suspendCoroutineOrReturn { cont: Continuation<T?> ->
116 setupTimeout(TimeoutOrNullCoroutine(time, cont), block)
117 }
118}
119
120private class TimeoutOrNullCoroutine<T>(
121 time: Int,
122 cont: Continuation<T?>
123) : TimeoutCoroutine<T?, T>(time, cont) {
124 @Suppress("UNCHECKED_CAST")
125 override fun afterCompletion(state: Any?, mode: Int) {
126 if (state is CompletedExceptionally) {
127 val exception = state.exception
128 if (exception is TimeoutCancellationException && exception.coroutine === this)
129 cont.resumeMode(null, mode) else
130 cont.resumeWithExceptionMode(exception, mode)
131 } else
132 cont.resumeMode(state as T, mode)
133 }
134
135 override fun toString(): String =
136 "TimeoutOrNullCoroutine($time)"
137}
138