Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 1 | package kotlinx.coroutines.experimental |
| 2 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 3 | import java.util.concurrent.ScheduledExecutorService |
Roman Elizarov | 55a47f9 | 2017-01-24 10:21:31 +0300 | [diff] [blame] | 4 | import java.util.concurrent.ScheduledThreadPoolExecutor |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 5 | import java.util.concurrent.TimeUnit |
| 6 | import kotlin.coroutines.startCoroutine |
| 7 | |
Roman Elizarov | 55a47f9 | 2017-01-24 10:21:31 +0300 | [diff] [blame] | 8 | val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 50L) |
| 9 | |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 10 | internal val scheduledExecutor by lazy<ScheduledExecutorService> { |
Roman Elizarov | 55a47f9 | 2017-01-24 10:21:31 +0300 | [diff] [blame] | 11 | ScheduledThreadPoolExecutor(1) { r -> |
| 12 | Thread(r, "kotlinx.coroutines.ScheduledExecutor") |
| 13 | }.apply { |
| 14 | setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS) |
| 15 | allowCoreThreadTimeOut(true) |
| 16 | // "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection |
| 17 | try { |
| 18 | val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType) |
| 19 | m.invoke(this, true) |
| 20 | } catch (ex: Throwable) { /* ignore */ } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 21 | } |
| 22 | } |
| 23 | |
| 24 | /** |
| 25 | * Runs a given suspending block of code inside a coroutine with a specified timeout and throws |
| 26 | * [CancellationException] if timeout was exceeded. |
| 27 | */ |
| 28 | suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T { |
| 29 | require(time >= 0) { "Timeout time $time cannot be negative" } |
| 30 | if (time <= 0L) throw CancellationException("Timed out immediately") |
| 31 | return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> |
| 32 | // schedule cancellation of this continuation on time |
| 33 | val timeout = scheduledExecutor.schedule({ |
| 34 | // create an exception with a specific text |
| 35 | cont.cancel(CancellationException("Timed out waiting for $time $unit")) |
| 36 | }, time, unit) |
| 37 | cont.cancelFutureOnCompletion(timeout) |
| 38 | // restart block in a separate coroutine using cancellable context of this continuation, |
| 39 | block.startCoroutine(cont) |
| 40 | } |
| 41 | } |