Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | c8879d6 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 2 | * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | f16fd27 | 2017-02-07 11:26:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 6 | |
Roman Elizarov | 1a016bd | 2017-07-12 11:41:34 +0300 | [diff] [blame] | 7 | import java.util.concurrent.* |
Vsevolod Tolstopyatov | 87f2faa | 2018-04-30 22:53:02 +0300 | [diff] [blame] | 8 | import java.util.concurrent.atomic.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 9 | import kotlin.coroutines.* |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 10 | |
| 11 | /** |
Roman Elizarov | ed7b864 | 2017-01-19 11:22:28 +0300 | [diff] [blame] | 12 | * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks. |
Nikolay Metchev | c1c380c | 2018-02-11 13:13:08 +0000 | [diff] [blame] | 13 | * |
| 14 | * If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements |
| 15 | * efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even |
| 16 | * when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar |
| 17 | * shared pool where not. |
| 18 | * |
| 19 | * If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread |
| 20 | * pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform |
| 21 | * privileged actions. |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 22 | */ |
Vsevolod Tolstopyatov | 1f7b2d8 | 2018-10-09 15:57:51 +0300 | [diff] [blame] | 23 | internal object CommonPool : ExecutorCoroutineDispatcher() { |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 24 | |
| 25 | /** |
| 26 | * Name of the property that controls default parallelism level of [CommonPool]. |
| 27 | * If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM). |
| 28 | * Note that until Java 10, if an application is run within a container, |
paolop | e06b6ca | 2018-06-09 09:48:44 +0000 | [diff] [blame] | 29 | * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores. |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 30 | */ |
Aurimas Liutikas | c8879d6 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 31 | public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism" |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 32 | |
Vsevolod Tolstopyatov | b517f05 | 2018-08-15 13:03:23 +0300 | [diff] [blame] | 33 | override val executor: Executor |
| 34 | get() = pool ?: getOrCreatePoolSync() |
| 35 | |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 36 | // Equals to -1 if not explicitly specified |
| 37 | private val requestedParallelism = run<Int> { |
| 38 | val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1 |
| 39 | val parallelism = property.toIntOrNull() |
| 40 | if (parallelism == null || parallelism < 1) { |
| 41 | error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property") |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 42 | } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 43 | parallelism |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 44 | } |
| 45 | |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 46 | private val parallelism: Int |
| 47 | get() = requestedParallelism.takeIf { it > 0 } |
| 48 | ?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1) |
| 49 | |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 50 | // For debug and tests |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 51 | private var usePrivatePool = false |
| 52 | |
| 53 | @Volatile |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 54 | private var pool: Executor? = null |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 55 | |
| 56 | private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null } |
| 57 | |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 58 | private fun createPool(): ExecutorService { |
Nikolay Metchev | c1c380c | 2018-02-11 13:13:08 +0000 | [diff] [blame] | 59 | if (System.getSecurityManager() != null) return createPlainPool() |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 60 | // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there) |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 61 | val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 62 | ?: return createPlainPool() // Fallback to plain thread pool |
Roman Elizarov | 3975391 | 2018-07-11 14:58:34 +0300 | [diff] [blame] | 63 | // Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 64 | if (!usePrivatePool && requestedParallelism < 0) { |
Aurimas Liutikas | c8879d6 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 65 | Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 66 | ?.takeIf { isGoodCommonPool(fjpClass, it) } |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 67 | ?.let { return it } |
| 68 | } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 69 | // Try to create private ForkJoinPool instance |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 70 | Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService } |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 71 | ?. let { return it } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 72 | // Fallback to plain thread pool |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 73 | return createPlainPool() |
| 74 | } |
| 75 | |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 76 | /** |
| 77 | * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs. |
| 78 | */ |
Roman Elizarov | 3975391 | 2018-07-11 14:58:34 +0300 | [diff] [blame] | 79 | internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean { |
| 80 | // We cannot use getParallelism, since it lies to us (always returns at least 1) |
| 81 | // So we submit a task and check that getPoolSize is at least one after that |
| 82 | // A broken FJP (that is configured for 0 parallelism) would not execute the task and |
| 83 | // would report its pool size as zero. |
| 84 | executor.submit {} |
| 85 | val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int } |
Roman Elizarov | 1e12210 | 2018-07-11 13:36:07 +0300 | [diff] [blame] | 86 | ?: return false |
| 87 | return actual >= 1 |
| 88 | } |
| 89 | |
Roman Elizarov | 7cf452e | 2017-01-29 21:58:33 +0300 | [diff] [blame] | 90 | private fun createPlainPool(): ExecutorService { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 91 | val threadId = AtomicInteger() |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 92 | return Executors.newFixedThreadPool(parallelism) { |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 93 | Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true } |
| 94 | } |
| 95 | } |
| 96 | |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 97 | @Synchronized |
Roman Elizarov | 1a016bd | 2017-07-12 11:41:34 +0300 | [diff] [blame] | 98 | private fun getOrCreatePoolSync(): Executor = |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 99 | pool ?: createPool().also { pool = it } |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 100 | |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 101 | override fun dispatch(context: CoroutineContext, block: Runnable) { |
| 102 | try { |
Roman Elizarov | 96a5c8e | 2019-07-02 16:59:22 +0300 | [diff] [blame] | 103 | (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 104 | } catch (e: RejectedExecutionException) { |
Roman Elizarov | 96a5c8e | 2019-07-02 16:59:22 +0300 | [diff] [blame] | 105 | unTrackTask() |
Roman Elizarov | 20341f2 | 2020-10-09 16:40:15 +0300 | [diff] [blame] | 106 | // CommonPool only rejects execution when it is being closed and this behavior is reserved |
| 107 | // for testing purposes, so we don't have to worry about cancelling the affected Job here. |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 108 | DefaultExecutor.enqueue(block) |
Roman Elizarov | 35d2c34 | 2017-07-20 14:54:39 +0300 | [diff] [blame] | 109 | } |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 110 | } |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 111 | |
| 112 | // used for tests |
| 113 | @Synchronized |
| 114 | internal fun usePrivatePool() { |
Roman Elizarov | 1a016bd | 2017-07-12 11:41:34 +0300 | [diff] [blame] | 115 | shutdown(0) |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 116 | usePrivatePool = true |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 117 | pool = null |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 118 | } |
| 119 | |
| 120 | // used for tests |
| 121 | @Synchronized |
Roman Elizarov | 1a016bd | 2017-07-12 11:41:34 +0300 | [diff] [blame] | 122 | internal fun shutdown(timeout: Long) { |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 123 | (pool as? ExecutorService)?.apply { |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 124 | shutdown() |
| 125 | if (timeout > 0) |
| 126 | awaitTermination(timeout, TimeUnit.MILLISECONDS) |
Roman Elizarov | 5173824 | 2018-12-21 16:41:39 +0300 | [diff] [blame] | 127 | shutdownNow().forEach { DefaultExecutor.enqueue(it) } |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 128 | } |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 129 | pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") } |
Roman Elizarov | 1a016bd | 2017-07-12 11:41:34 +0300 | [diff] [blame] | 130 | } |
| 131 | |
| 132 | // used for tests |
| 133 | @Synchronized |
| 134 | internal fun restore() { |
| 135 | shutdown(0) |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 136 | usePrivatePool = false |
Vsevolod Tolstopyatov | 6a0ce76 | 2018-05-22 19:47:50 +0300 | [diff] [blame] | 137 | pool = null |
Roman Elizarov | 731f0ad | 2017-02-22 20:48:45 +0300 | [diff] [blame] | 138 | } |
Roman Elizarov | dc9fd1c | 2017-04-07 10:35:28 +0300 | [diff] [blame] | 139 | |
| 140 | override fun toString(): String = "CommonPool" |
Vsevolod Tolstopyatov | b517f05 | 2018-08-15 13:03:23 +0300 | [diff] [blame] | 141 | |
| 142 | override fun close(): Unit = error("Close cannot be invoked on CommonPool") |
Roman Elizarov | 3754f95 | 2017-01-18 20:47:54 +0300 | [diff] [blame] | 143 | } |