blob: 81ef0adfee1a4ad1560e59128cdee7f460362ea2 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.timeunit.TimeUnit
import java.io.*
import java.util.concurrent.*
import kotlin.coroutines.experimental.*
/**
* [CoroutineDispatcher] that implements [Closeable]
*/
abstract class CloseableCoroutineDispatcher: CoroutineDispatcher(), Closeable
/**
* Converts an instance of [ExecutorService] to an implementation of [CloseableCoroutineDispatcher].
*/
public fun ExecutorService.asCoroutineDispatcher(): CloseableCoroutineDispatcher =
// we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
(this as Executor).asCoroutineDispatcher() as CloseableCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
*/
@Deprecated("Renamed to `asCoroutineDispatcher`",
replaceWith = ReplaceWith("asCoroutineDispatcher()"))
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*/
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
/**
* @suppress **This is unstable API and it is subject to change.**
*/
public abstract class ExecutorCoroutineDispatcherBase : CloseableCoroutineDispatcher(), Delay {
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal abstract val executor: Executor
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
continuation.cancelFutureOnCancellation(timeout)
else
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(block, time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
return DisposableFutureHandle(timeout)
else
return DefaultExecutor.invokeOnTimeout(time, unit, block)
}
override fun close() {
(executor as? ExecutorService)?.shutdown()
}
override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
}
/**
* An implementation of [DisposableHandle] that cancels the specified future on dispose.
* @suppress **This is unstable API and it is subject to change.**
*/
public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
override fun dispose() {
future.cancel(false)
}
override fun toString(): String = "DisposableFutureHandle[$future]"
}