blob: e47035feb3228a3f7a21c22587e2cf654b8bf64a [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlinx.coroutines.experimental.timeunit.*
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*
/**
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
* [TimeoutCancellationException] if timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], it
* is still thrown by `withTimeout` invocation.
*
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time in milliseconds.
*/
public suspend fun <T> withTimeout(time: Int, block: suspend CoroutineScope.() -> T): T =
withTimeout(time.toLong(), TimeUnit.MILLISECONDS, block)
/**
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
* [TimeoutCancellationException] if timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], it
* is still thrown by `withTimeout` invocation.
*
* The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time
* @param unit timeout unit (milliseconds by default)
*/
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T {
if (time <= 0L) throw CancellationException("Timed out immediately")
return suspendCoroutineOrReturn { cont: Continuation<T> ->
setupTimeout(TimeoutCoroutine(time, unit, cont), block)
}
}
private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// schedule cancellation of this coroutine on time
val cont = coroutine.cont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine.unit, coroutine))
// restart block using new coroutine with new job,
// however start it as undispatched coroutine, because we are already in the proper context
return coroutine.startUndispatchedOrReturn(coroutine, block)
}
/**
* @suppress **Deprecated**: for binary compatibility only
*/
@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T =
withTimeout(time, unit) { block() }
private open class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
@JvmField val unit: TimeUnit,
@JvmField val cont: Continuation<U>
) : AbstractCoroutine<T>(cont.context, active = true), Runnable, Continuation<T> {
override val defaultResumeMode: Int get() = MODE_DIRECT
@Suppress("LeakingThis")
override fun run() {
cancel(TimeoutCancellationException(time, unit, this))
}
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally)
cont.resumeWithExceptionMode(state.cause, mode)
else
cont.resumeMode(state as T, mode)
}
override fun nameString(): String =
"${super.nameString()}($time $unit)"
}
/**
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], this
* invocation of `withTimeoutOrNull` still returns `null`.
*
* The sibling function that throws exception on timeout is [withTimeout].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time in milliseconds.
*/
public suspend fun <T> withTimeoutOrNull(time: Int, block: suspend CoroutineScope.() -> T): T? =
withTimeoutOrNull(time.toLong(), TimeUnit.MILLISECONDS, block)
/**
* Runs a given suspending block of code inside a coroutine with a specified timeout and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws [TimeoutCancellationException].
* Even if the code in the block suppresses [TimeoutCancellationException], this
* invocation of `withTimeoutOrNull` still returns `null`.
*
* The sibling function that throws exception on timeout is [withTimeout].
* Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
*
* @param time timeout time
* @param unit timeout unit (milliseconds by default)
*/
public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend CoroutineScope.() -> T): T? {
if (time <= 0L) return null
return suspendCoroutineOrReturn { cont: Continuation<T?> ->
setupTimeout(TimeoutOrNullCoroutine(time, unit, cont), block)
}
}
/**
* @suppress **Deprecated**: for binary compatibility only
*/
@Deprecated("for binary compatibility only", level=DeprecationLevel.HIDDEN)
public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? =
withTimeoutOrNull(time, unit) { block() }
private class TimeoutOrNullCoroutine<T>(
time: Long,
unit: TimeUnit,
cont: Continuation<T?>
) : TimeoutCoroutine<T?, T>(time, unit, cont) {
@Suppress("UNCHECKED_CAST")
internal override fun onCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally) {
val exception = state.cause
if (exception is TimeoutCancellationException && exception.coroutine === this)
cont.resumeMode(null, mode) else
cont.resumeWithExceptionMode(exception, mode)
} else
cont.resumeMode(state as T, mode)
}
}
/**
* This exception is thrown by [withTimeout] to indicate timeout.
*/
public class TimeoutCancellationException internal constructor(
message: String,
@JvmField internal val coroutine: Job?
) : CancellationException(message) {
/**
* Creates timeout exception with a given message.
*/
public constructor(message: String) : this(message, null)
}
@Suppress("FunctionName")
internal fun TimeoutCancellationException(
time: Long,
unit: TimeUnit,
coroutine: Job
) : TimeoutCancellationException = TimeoutCancellationException("Timed out waiting for $time $unit", coroutine)