blob: f20753a7bc315088982a683921212212cd75df36 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov67891d82017-01-23 16:47:20 +030017package kotlinx.coroutines.experimental
18
Roman Elizarovaa461cf2018-04-11 13:20:29 +030019import kotlinx.coroutines.experimental.timeunit.TimeUnit
20import java.io.*
21import java.util.concurrent.*
22import kotlin.coroutines.experimental.*
Roman Elizarov67891d82017-01-23 16:47:20 +030023
24/**
Marko Devcic26f4b9e2018-03-20 20:28:52 +010025 * [CoroutineDispatcher] that implements [Closeable]
26 */
27abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
28
29/**
30 * Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
31 */
32public fun ExecutorService.asCoroutineDispatcher(): CloseableCoroutineDispatcher =
33 // we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
34 (this as Executor).asCoroutineDispatcher() as CloseableCoroutineDispatcher
35
36/**
Roman Elizarov67891d82017-01-23 16:47:20 +030037 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
Roman Elizarov38b5ea12017-03-09 12:03:39 +030038 * @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
Roman Elizarov67891d82017-01-23 16:47:20 +030039 */
Roman Elizarov38b5ea12017-03-09 12:03:39 +030040@Deprecated("Renamed to `asCoroutineDispatcher`",
41 replaceWith = ReplaceWith("asCoroutineDispatcher()"))
Roman Elizarov67891d82017-01-23 16:47:20 +030042public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
43 ExecutorCoroutineDispatcher(this)
44
Roman Elizarov38b5ea12017-03-09 12:03:39 +030045/**
46 * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
47 */
48public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
49 ExecutorCoroutineDispatcher(this)
50
Roman Elizarovf04f51d2017-04-18 15:28:35 +030051private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
52
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080053/**
54 * @suppress **This is unstable API and it is subject to change.**
55 */
Marko Devcic26f4b9e2018-03-20 20:28:52 +010056public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispatcher(), Delay {
Roman Elizarovd9ae2bc2017-10-20 17:36:56 +080057 /**
58 * @suppress **This is unstable API and it is subject to change.**
59 */
60 internal abstract val executor: Executor
61
Roman Elizarov92b04852017-07-11 15:11:58 +030062 override fun dispatch(context: CoroutineContext, block: Runnable) =
Roman Elizarov35d2c342017-07-20 14:54:39 +030063 try { executor.execute(timeSource.trackTask(block)) }
64 catch (e: RejectedExecutionException) {
65 timeSource.unTrackTask()
66 DefaultExecutor.execute(block)
67 }
Roman Elizarov67891d82017-01-23 16:47:20 +030068
69 override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
Roman Elizarov92b04852017-07-11 15:11:58 +030070 val timeout =
71 try { (executor as? ScheduledExecutorService)
72 ?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
73 catch (e: RejectedExecutionException) { null }
Roman Elizarov35d2c342017-07-20 14:54:39 +030074 if (timeout != null)
75 continuation.cancelFutureOnCompletion(timeout)
76 else
77 DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
Roman Elizarov67891d82017-01-23 16:47:20 +030078 }
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +030079
80 override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
Roman Elizarov92b04852017-07-11 15:11:58 +030081 val timeout =
82 try { (executor as? ScheduledExecutorService)
83 ?.schedule(block, time, unit) }
84 catch (e: RejectedExecutionException) { null }
Roman Elizarov35d2c342017-07-20 14:54:39 +030085 if (timeout != null)
86 return DisposableFutureHandle(timeout)
87 else
88 return DefaultExecutor.invokeOnTimeout(time, unit, block)
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +030089 }
Roman Elizarov30dd5c12017-04-11 13:50:03 +030090
Marko Devcic26f4b9e2018-03-20 20:28:52 +010091 override fun close() {
92 (executor as? ExecutorService)?.shutdown()
93 }
94
Roman Elizarov30dd5c12017-04-11 13:50:03 +030095 override fun toString(): String = executor.toString()
Roman Elizarovf04f51d2017-04-18 15:28:35 +030096 override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
Roman Elizarov30dd5c12017-04-11 13:50:03 +030097 override fun hashCode(): Int = System.identityHashCode(executor)
Roman Elizarov67891d82017-01-23 16:47:20 +030098}
99
Roman Elizarovf04f51d2017-04-18 15:28:35 +0300100private class ResumeUndispatchedRunnable(
101 private val dispatcher: CoroutineDispatcher,
102 private val continuation: CancellableContinuation<Unit>
103) : Runnable {
104 override fun run() {
105 with(continuation) { dispatcher.resumeUndispatched(Unit) }
106 }
107}
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300108
109/**
110 * An implementation of [DisposableHandle] that cancels the specified future on dispose.
111 * @suppress **This is unstable API and it is subject to change.**
112 */
113public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
114 override fun dispose() {
115 future.cancel(false)
116 }
117 override fun toString(): String = "DisposableFutureHandle[$future]"
118}