blob: 81ef0adfee1a4ad1560e59128cdee7f460362ea2 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov67891d82017-01-23 16:47:20 +03005package kotlinx.coroutines.experimental
6
Roman Elizarovaa461cf2018-04-11 13:20:29 +03007import kotlinx.coroutines.experimental.timeunit.TimeUnit
8import java.io.*
9import java.util.concurrent.*
10import kotlin.coroutines.experimental.*
Roman Elizarov67891d82017-01-23 16:47:20 +030011
12/**
Marko Devcic26f4b9e2018-03-20 20:28:52 +010013 * [CoroutineDispatcher] that implements [Closeable]
14 */
15abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
16
17/**
18 * Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
19 */
20public fun ExecutorService.asCoroutineDispatcher(): CloseableCoroutineDispatcher =
21 // we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
22 (this as Executor).asCoroutineDispatcher() as CloseableCoroutineDispatcher
23
24/**
Roman Elizarov67891d82017-01-23 16:47:20 +030025 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
Roman Elizarov38b5ea12017-03-09 12:03:39 +030026 * @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
Roman Elizarov67891d82017-01-23 16:47:20 +030027 */
Roman Elizarov38b5ea12017-03-09 12:03:39 +030028@Deprecated("Renamed to `asCoroutineDispatcher`",
29 replaceWith = ReplaceWith("asCoroutineDispatcher()"))
Roman Elizarov67891d82017-01-23 16:47:20 +030030public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
31 ExecutorCoroutineDispatcher(this)
32
Roman Elizarov38b5ea12017-03-09 12:03:39 +030033/**
34 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
35 */
36public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
37 ExecutorCoroutineDispatcher(this)
38
Roman Elizarovf04f51d2017-04-18 15:28:35 +030039private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
40
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080041/**
42 * @suppress **This is unstable API and it is subject to change.**
43 */
Marko Devcic26f4b9e2018-03-20 20:28:52 +010044public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispatcher(), Delay {
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080045 /**
46 * @suppress **This is unstable API and it is subject to change.**
47 */
48 internal abstract val executor: Executor
49
Roman Elizarov92b04852017-07-11 15:11:58 +030050 override fun dispatch(context: CoroutineContext, block: Runnable) =
Roman Elizarov35d2c342017-07-20 14:54:39 +030051 try { executor.execute(timeSource.trackTask(block)) }
52 catch (e: RejectedExecutionException) {
53 timeSource.unTrackTask()
54 DefaultExecutor.execute(block)
55 }
Roman Elizarov67891d82017-01-23 16:47:20 +030056
57 override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
Roman Elizarov92b04852017-07-11 15:11:58 +030058 val timeout =
59 try { (executor as? ScheduledExecutorService)
60 ?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
61 catch (e: RejectedExecutionException) { null }
Roman Elizarov35d2c342017-07-20 14:54:39 +030062 if (timeout != null)
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +030063 continuation.cancelFutureOnCancellation(timeout)
Roman Elizarov35d2c342017-07-20 14:54:39 +030064 else
65 DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
Roman Elizarov67891d82017-01-23 16:47:20 +030066 }
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +030067
68 override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
Roman Elizarov92b04852017-07-11 15:11:58 +030069 val timeout =
70 try { (executor as? ScheduledExecutorService)
71 ?.schedule(block, time, unit) }
72 catch (e: RejectedExecutionException) { null }
Roman Elizarov35d2c342017-07-20 14:54:39 +030073 if (timeout != null)
74 return DisposableFutureHandle(timeout)
75 else
76 return DefaultExecutor.invokeOnTimeout(time, unit, block)
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +030077 }
Roman Elizarov30dd5c12017-04-11 13:50:03 +030078
Marko Devcic26f4b9e2018-03-20 20:28:52 +010079 override fun close() {
80 (executor as? ExecutorService)?.shutdown()
81 }
82
Roman Elizarov30dd5c12017-04-11 13:50:03 +030083 override fun toString(): String = executor.toString()
Roman Elizarovf04f51d2017-04-18 15:28:35 +030084 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
Roman Elizarov30dd5c12017-04-11 13:50:03 +030085 override fun hashCode(): Int = System.identityHashCode(executor)
Roman Elizarov67891d82017-01-23 16:47:20 +030086}
87
Roman Elizarovf04f51d2017-04-18 15:28:35 +030088private class ResumeUndispatchedRunnable(
89 private val dispatcher: CoroutineDispatcher,
90 private val continuation: CancellableContinuation<Unit>
91) : Runnable {
92 override fun run() {
93 with(continuation) { dispatcher.resumeUndispatched(Unit) }
94 }
95}
Roman Elizarovaa461cf2018-04-11 13:20:29 +030096
97/**
98 * An implementation of [DisposableHandle] that cancels the specified future on dispose.
99 * @suppress **This is unstable API and it is subject to change.**
100 */
101public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
102 override fun dispose() {
103 future.cancel(false)
104 }
105 override fun toString(): String = "DisposableFutureHandle[$future]"
106}