blob: f20753a7bc315088982a683921212212cd75df36 [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.timeunit.TimeUnit
import java.io.*
import java.util.concurrent.*
import kotlin.coroutines.experimental.*
/**
* [CoroutineDispatcher] that implements [Closeable]
*/
abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
/**
* Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
*/
public fun ExecutorService.asCoroutineDispatcher(): CloseableCoroutineDispatcher =
// we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
(this as Executor).asCoroutineDispatcher() as CloseableCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
*/
@Deprecated("Renamed to `asCoroutineDispatcher`",
replaceWith = ReplaceWith("asCoroutineDispatcher()"))
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*/
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
/**
* @suppress **This is unstable API and it is subject to change.**
*/
public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispatcher(), Delay {
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal abstract val executor: Executor
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
continuation.cancelFutureOnCompletion(timeout)
else
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(block, time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
return DisposableFutureHandle(timeout)
else
return DefaultExecutor.invokeOnTimeout(time, unit, block)
}
override fun close() {
(executor as? ExecutorService)?.shutdown()
}
override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
}
/**
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
* @suppress **This is unstable API and it is subject to change.**
*/
public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
override fun dispose() {
future.cancel(false)
}
override fun toString(): String = "DisposableFutureHandle[$future]"
}