blob: 0c03cfd17b0649cda10caa4feff4c707abd19a05 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov3754f952017-01-18 20:47:54 +030017package kotlinx.coroutines.experimental
18
Roman Elizarov7cf452e2017-01-29 21:58:33 +030019import java.util.concurrent.ExecutorService
Roman Elizarov3754f952017-01-18 20:47:54 +030020import java.util.concurrent.Executors
Roman Elizarov3754f952017-01-18 20:47:54 +030021import java.util.concurrent.atomic.AtomicInteger
Roman Elizarovea4a51b2017-01-31 12:01:25 +030022import kotlin.coroutines.experimental.CoroutineContext
Roman Elizarov3754f952017-01-18 20:47:54 +030023
24/**
Roman Elizaroved7b8642017-01-19 11:22:28 +030025 * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
Roman Elizarov49ebab92017-01-24 12:20:06 +030026 * It uses [java.util.concurrent.ForkJoinPool] when available, which implements efficient work-stealing algorithm for its queues, so every
Roman Elizarov3754f952017-01-18 20:47:54 +030027 * coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
Roman Elizarov49ebab92017-01-24 12:20:06 +030028 * When available, it wraps `ForkJoinPool.commonPool` and provides a similar shared pool where not.
Roman Elizarov3754f952017-01-18 20:47:54 +030029 */
Roman Elizarov67891d82017-01-23 16:47:20 +030030object CommonPool : CoroutineDispatcher() {
Roman Elizarov7cf452e2017-01-29 21:58:33 +030031 private val pool: ExecutorService = findPool()
Roman Elizarov3754f952017-01-18 20:47:54 +030032
33 private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
34
Roman Elizarov7cf452e2017-01-29 21:58:33 +030035 private fun findPool(): ExecutorService {
Roman Elizarov3754f952017-01-18 20:47:54 +030036 val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
37 ?: return createPlainPool()
Roman Elizarov7cf452e2017-01-29 21:58:33 +030038 Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
Roman Elizarov3754f952017-01-18 20:47:54 +030039 ?. let { return it }
Roman Elizarov7cf452e2017-01-29 21:58:33 +030040 Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
Roman Elizarov3754f952017-01-18 20:47:54 +030041 ?. let { return it }
42 return createPlainPool()
43 }
44
Roman Elizarov7cf452e2017-01-29 21:58:33 +030045 private fun createPlainPool(): ExecutorService {
Roman Elizarov3754f952017-01-18 20:47:54 +030046 val threadId = AtomicInteger()
47 return Executors.newFixedThreadPool(defaultParallelism()) {
48 Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
49 }
50 }
51
52 private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
53
Roman Elizarov67891d82017-01-23 16:47:20 +030054 override fun dispatch(context: CoroutineContext, block: Runnable) = pool.execute(block)
Roman Elizarov3754f952017-01-18 20:47:54 +030055}