blob: 96ad84de3406bcfeeb6fa9e02c1b3df58c7d89c6 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
/**
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
* [TimeoutCancellationException] if timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], it
* is still thrown by `withTimeout` invocation.
*
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time in milliseconds.
*/
public actual suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T {
require(time >= 0) { "Timeout time $time cannot be negative" }
if (time <= 0L) throw CancellationException("Timed out immediately")
return suspendCoroutineOrReturn { cont: Continuation<T> ->
setupTimeout(TimeoutCoroutine(time, cont), block)
}
}
private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// schedule cancellation of this coroutine on time
val cont = coroutine.cont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
coroutine.initParentJob(context[Job])
// restart block using new coroutine with new job,
// however start it as undispatched coroutine, because we are already in the proper context
val result = try {
block.startCoroutineUninterceptedOrReturn(receiver = coroutine, completion = coroutine)
} catch (e: Throwable) {
CompletedExceptionally(e)
}
return when {
result == COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
coroutine.makeCompletingOnce(result, MODE_IGNORE) -> {
if (result is CompletedExceptionally) throw result.exception else result
}
else -> COROUTINE_SUSPENDED
}
}
private open class TimeoutCoroutine<U, in T: U>(
val time: Int,
val cont: Continuation<U>
) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
override val defaultResumeMode: Int get() = MODE_DIRECT
@Suppress("LeakingThis")
override fun run() {
cancel(TimeoutCancellationException(time, this))
}
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?, mode: Int) {
if (state is CompletedExceptionally)
cont.resumeWithExceptionMode(state.exception, mode)
else
cont.resumeMode(state as T, mode)
}
override fun toString(): String =
"TimeoutCoroutine($time)"
}
/**
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], this
* invocation of `withTimeoutOrNull` still returns `null`.
*
* The sibling function that throws exception on timeout is [withTimeout].
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time in milliseconds.
*/
public actual suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? {
require(time >= 0) { "Timeout time $time cannot be negative" }
if (time <= 0L) return null
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
setupTimeout(TimeoutOrNullCoroutine(time, cont), block)
}
}
private class TimeoutOrNullCoroutine<T>(
time: Int,
cont: Continuation<T?>
) : TimeoutCoroutine<T?, T>(time, cont) {
@Suppress("UNCHECKED_CAST")
override fun afterCompletion(state: Any?, mode: Int) {
if (state is CompletedExceptionally) {
val exception = state.exception
if (exception is TimeoutCancellationException && exception.coroutine === this)
cont.resumeMode(null, mode) else
cont.resumeWithExceptionMode(exception, mode)
} else
cont.resumeMode(state as T, mode)
}
override fun toString(): String =
"TimeoutOrNullCoroutine($time)"
}