blob: a55dd314a6e0ab474196b32a786205daf6950a9d [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental
import java.util.concurrent.Executor
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.CoroutineContext
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
* @suppress **Deprecated**: Renamed to [asCoroutineDispatcher].
*/
@Deprecated("Renamed to `asCoroutineDispatcher`",
replaceWith = ReplaceWith("asCoroutineDispatcher()"))
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*/
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
ExecutorCoroutineDispatcher(this)
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
/**
* @suppress **This is unstable API and it is subject to change.**
*/
public abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal abstract val executor: Executor
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
continuation.cancelFutureOnCompletion(timeout)
else
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(block, time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
return DisposableFutureHandle(timeout)
else
return DefaultExecutor.invokeOnTimeout(time, unit, block)
}
override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
}