| package kotlinx.coroutines.experimental.scheduling |
| |
| import kotlinx.atomicfu.* |
| import kotlinx.coroutines.experimental.* |
| import java.io.* |
| import java.util.concurrent.* |
| import kotlin.coroutines.experimental.* |
| |
| /** |
| * @suppress **This is unstable API and it is subject to change.** |
| */ |
| // TODO make internal after integration wih Ktor |
| class ExperimentalCoroutineDispatcher( |
| corePoolSize: Int = CORE_POOL_SIZE, |
| maxPoolSize: Int = MAX_POOL_SIZE |
| ) : CoroutineDispatcher(), Delay, Closeable { |
| private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize) |
| |
| override fun dispatch(context: CoroutineContext, block: Runnable): Unit = |
| coroutineScheduler.dispatch(block) |
| |
| override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = |
| coroutineScheduler.dispatch(block, fair = true) |
| |
| override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit = |
| DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation) |
| |
| override fun close() = coroutineScheduler.close() |
| |
| override fun toString(): String { |
| return "${super.toString()}[scheduler = $coroutineScheduler]" |
| } |
| |
| /** |
| * Creates new coroutine execution context with limited parallelism to execute tasks which may potentially block. |
| * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and piggybacks on the original [ExperimentalCoroutineDispatcher], |
| * executing tasks in this context, giving original dispatcher hint to adjust its behaviour. |
| * |
| * @param parallelism parallelism level, indicating how many threads can execute tasks in given context in parallel. |
| */ |
| fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher { |
| require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } |
| return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING) |
| } |
| |
| internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit = |
| coroutineScheduler.dispatch(block, context, fair) |
| } |
| |
| private class LimitingBlockingDispatcher( |
| val dispatcher: ExperimentalCoroutineDispatcher, |
| val parallelism: Int, |
| override val taskMode: TaskMode |
| ) : CoroutineDispatcher(), Delay, TaskContext { |
| |
| private val queue = ConcurrentLinkedQueue<Runnable>() |
| private val inFlightTasks = atomic(0) |
| |
| override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) |
| |
| private fun dispatch(block: Runnable, fair: Boolean) { |
| var taskToSchedule = block |
| while (true) { |
| // Commit in-flight tasks slot |
| val inFlight = inFlightTasks.incrementAndGet() |
| |
| // Fast path, if parallelism limit is not reached, dispatch task and return |
| if (inFlight <= parallelism) { |
| dispatcher.dispatchWithContext(taskToSchedule, this, fair) |
| return |
| } |
| |
| // Parallelism limit is reached, add task to the queue |
| queue.add(taskToSchedule) |
| |
| /* |
| * We're not actually scheduled anything, so rollback committed in-flight task slot: |
| * If the amount of in-flight tasks is still above the limit, do nothing |
| * If the amount of in-flight tasks is lesser than parallelism, then |
| * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue |
| * to avoid starvation. |
| * |
| * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: |
| * |
| * T1: submit task, start execution, R == 1 |
| * T2: commit slot for next task, R == 2 |
| * T1: finish T1, R == 1 |
| * T2: submit next task to local queue, decrement R, R == 0 |
| * Without retries, task from T2 will be stuck in the local queue |
| */ |
| if (inFlightTasks.decrementAndGet() >= parallelism) { |
| return |
| } |
| |
| taskToSchedule = queue.poll() ?: return |
| } |
| } |
| |
| override fun toString(): String { |
| return "${super.toString()}[dispatcher = $dispatcher]" |
| } |
| |
| /** |
| * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. |
| * |
| * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid |
| * non-blocking continuations starvation. |
| * E.g. for |
| * ``` |
| * foo() |
| * blocking() |
| * bar() |
| * ``` |
| * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task |
| */ |
| override fun afterTask() { |
| var next = queue.poll() |
| // If we have pending tasks in current blocking context, dispatch first |
| if (next != null) { |
| dispatcher.dispatchWithContext(next, this, true) |
| return |
| } |
| inFlightTasks.decrementAndGet() |
| |
| /* |
| * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. |
| * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: |
| * T1: submit task, start execution, R == 1 |
| * T2: commit slot for next task, R == 2 |
| * T1: finish T1, poll queue (it's still empty), R == 2 |
| * T2: submit next task to the local queue, decrement R, R == 1 |
| * T1: decrement R, finish. R == 0 |
| * |
| * The task from T2 is stuck is the local queue |
| */ |
| next = queue.poll() ?: return |
| dispatch(next, true) |
| } |
| |
| override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) = |
| dispatcher.scheduleResumeAfterDelay(time, unit, continuation) |
| } |