blob: 2203313120e1aaf314c04bae1bb0b9c967162c0c [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Aurimas Liutikasc8879d62021-05-12 21:56:16 +00002 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005package kotlinx.coroutines
Roman Elizarov3754f952017-01-18 20:47:54 +03006
Roman Elizarov1a016bd2017-07-12 11:41:34 +03007import java.util.concurrent.*
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +03008import java.util.concurrent.atomic.*
Roman Elizarov0950dfa2018-07-13 10:33:25 +03009import kotlin.coroutines.*
Roman Elizarov3754f952017-01-18 20:47:54 +030010
11/**
Roman Elizaroved7b8642017-01-19 11:22:28 +030012 * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
Nikolay Metchevc1c380c2018-02-11 13:13:08 +000013 *
14 * If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements
15 * efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even
16 * when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar
17 * shared pool where not.
18 *
19 * If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread
20 * pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform
21 * privileged actions.
Roman Elizarov3754f952017-01-18 20:47:54 +030022 */
Vsevolod Tolstopyatov1f7b2d82018-10-09 15:57:51 +030023internal object CommonPool : ExecutorCoroutineDispatcher() {
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030024
25 /**
26 * Name of the property that controls default parallelism level of [CommonPool].
27 * If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM).
28 * Note that until Java 10, if an application is run within a container,
paolope06b6ca2018-06-09 09:48:44 +000029 * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores.
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030030 */
Aurimas Liutikasc8879d62021-05-12 21:56:16 +000031 public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030032
Vsevolod Tolstopyatovb517f052018-08-15 13:03:23 +030033 override val executor: Executor
34 get() = pool ?: getOrCreatePoolSync()
35
Roman Elizarov1e122102018-07-11 13:36:07 +030036 // Equals to -1 if not explicitly specified
37 private val requestedParallelism = run<Int> {
38 val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1
39 val parallelism = property.toIntOrNull()
40 if (parallelism == null || parallelism < 1) {
41 error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030042 }
Roman Elizarov1e122102018-07-11 13:36:07 +030043 parallelism
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030044 }
45
Roman Elizarov1e122102018-07-11 13:36:07 +030046 private val parallelism: Int
47 get() = requestedParallelism.takeIf { it > 0 }
48 ?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
49
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030050 // For debug and tests
Roman Elizarov731f0ad2017-02-22 20:48:45 +030051 private var usePrivatePool = false
52
53 @Volatile
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030054 private var pool: Executor? = null
Roman Elizarov3754f952017-01-18 20:47:54 +030055
56 private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
57
Roman Elizarov731f0ad2017-02-22 20:48:45 +030058 private fun createPool(): ExecutorService {
Nikolay Metchevc1c380c2018-02-11 13:13:08 +000059 if (System.getSecurityManager() != null) return createPlainPool()
Roman Elizarov1e122102018-07-11 13:36:07 +030060 // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there)
Roman Elizarov3754f952017-01-18 20:47:54 +030061 val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
Roman Elizarov1e122102018-07-11 13:36:07 +030062 ?: return createPlainPool() // Fallback to plain thread pool
Roman Elizarov39753912018-07-11 14:58:34 +030063 // Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode
Roman Elizarov1e122102018-07-11 13:36:07 +030064 if (!usePrivatePool && requestedParallelism < 0) {
Aurimas Liutikasc8879d62021-05-12 21:56:16 +000065 Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
Roman Elizarov1e122102018-07-11 13:36:07 +030066 ?.takeIf { isGoodCommonPool(fjpClass, it) }
Roman Elizarov731f0ad2017-02-22 20:48:45 +030067 ?.let { return it }
68 }
Roman Elizarov1e122102018-07-11 13:36:07 +030069 // Try to create private ForkJoinPool instance
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030070 Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
Roman Elizarov3754f952017-01-18 20:47:54 +030071 ?. let { return it }
Roman Elizarov1e122102018-07-11 13:36:07 +030072 // Fallback to plain thread pool
Roman Elizarov3754f952017-01-18 20:47:54 +030073 return createPlainPool()
74 }
75
Roman Elizarov1e122102018-07-11 13:36:07 +030076 /**
77 * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs.
78 */
Roman Elizarov39753912018-07-11 14:58:34 +030079 internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean {
80 // We cannot use getParallelism, since it lies to us (always returns at least 1)
81 // So we submit a task and check that getPoolSize is at least one after that
82 // A broken FJP (that is configured for 0 parallelism) would not execute the task and
83 // would report its pool size as zero.
84 executor.submit {}
85 val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int }
Roman Elizarov1e122102018-07-11 13:36:07 +030086 ?: return false
87 return actual >= 1
88 }
89
Roman Elizarov7cf452e2017-01-29 21:58:33 +030090 private fun createPlainPool(): ExecutorService {
Roman Elizarov3754f952017-01-18 20:47:54 +030091 val threadId = AtomicInteger()
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030092 return Executors.newFixedThreadPool(parallelism) {
Roman Elizarov3754f952017-01-18 20:47:54 +030093 Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
94 }
95 }
96
Roman Elizarov731f0ad2017-02-22 20:48:45 +030097 @Synchronized
Roman Elizarov1a016bd2017-07-12 11:41:34 +030098 private fun getOrCreatePoolSync(): Executor =
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030099 pool ?: createPool().also { pool = it }
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300100
Roman Elizarov51738242018-12-21 16:41:39 +0300101 override fun dispatch(context: CoroutineContext, block: Runnable) {
102 try {
Roman Elizarov96a5c8e2019-07-02 16:59:22 +0300103 (pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
Roman Elizarov51738242018-12-21 16:41:39 +0300104 } catch (e: RejectedExecutionException) {
Roman Elizarov96a5c8e2019-07-02 16:59:22 +0300105 unTrackTask()
Roman Elizarov20341f22020-10-09 16:40:15 +0300106 // CommonPool only rejects execution when it is being closed and this behavior is reserved
107 // for testing purposes, so we don't have to worry about cancelling the affected Job here.
Roman Elizarov51738242018-12-21 16:41:39 +0300108 DefaultExecutor.enqueue(block)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300109 }
Roman Elizarov51738242018-12-21 16:41:39 +0300110 }
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300111
112 // used for tests
113 @Synchronized
114 internal fun usePrivatePool() {
Roman Elizarov1a016bd2017-07-12 11:41:34 +0300115 shutdown(0)
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300116 usePrivatePool = true
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300117 pool = null
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300118 }
119
120 // used for tests
121 @Synchronized
Roman Elizarov1a016bd2017-07-12 11:41:34 +0300122 internal fun shutdown(timeout: Long) {
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300123 (pool as? ExecutorService)?.apply {
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300124 shutdown()
125 if (timeout > 0)
126 awaitTermination(timeout, TimeUnit.MILLISECONDS)
Roman Elizarov51738242018-12-21 16:41:39 +0300127 shutdownNow().forEach { DefaultExecutor.enqueue(it) }
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300128 }
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300129 pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
Roman Elizarov1a016bd2017-07-12 11:41:34 +0300130 }
131
132 // used for tests
133 @Synchronized
134 internal fun restore() {
135 shutdown(0)
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300136 usePrivatePool = false
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300137 pool = null
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300138 }
Roman Elizarovdc9fd1c2017-04-07 10:35:28 +0300139
140 override fun toString(): String = "CommonPool"
Vsevolod Tolstopyatovb517f052018-08-15 13:03:23 +0300141
142 override fun close(): Unit = error("Close cannot be invoked on CommonPool")
Roman Elizarov3754f952017-01-18 20:47:54 +0300143}