Validate FJP.commonPool parallelism to avoid pathological bugs
Also always construct private FJP pool if
kotlinx.coroutines.default.parallelism is explicitly specified.
Fixes #432, #288
diff --git a/core/kotlinx-coroutines-core/src/CommonPool.kt b/core/kotlinx-coroutines-core/src/CommonPool.kt
index 5c720d0..eafe97e 100644
--- a/core/kotlinx-coroutines-core/src/CommonPool.kt
+++ b/core/kotlinx-coroutines-core/src/CommonPool.kt
@@ -31,19 +31,20 @@
*/
public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
- private val parallelism = run<Int> {
- val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) }
- if (property == null) {
- (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
- } else {
- val parallelism = property.toIntOrNull()
- if (parallelism == null || parallelism < 1) {
- error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
- }
- parallelism
+ // Equals to -1 if not explicitly specified
+ private val requestedParallelism = run<Int> {
+ val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1
+ val parallelism = property.toIntOrNull()
+ if (parallelism == null || parallelism < 1) {
+ error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
}
+ parallelism
}
+ private val parallelism: Int
+ get() = requestedParallelism.takeIf { it > 0 }
+ ?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
+
// For debug and tests
private var usePrivatePool = false
@@ -54,17 +55,31 @@
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
+ // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there)
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
- ?: return createPlainPool()
- if (!usePrivatePool) {
+ ?: return createPlainPool() // Fallback to plain thread pool
+ // Try to use commonPool unless parallelism was explicitly specified or int debug privatePool mode
+ if (!usePrivatePool && requestedParallelism < 0) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
+ ?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
+ // Try to create private ForkJoinPool instance
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
+ // Fallback to plain thread pool
return createPlainPool()
}
+ /**
+ * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs.
+ */
+ private fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean {
+ val actual = Try { fjpClass.getMethod("getParallelism").invoke(executor) as? Int }
+ ?: return false
+ return actual >= 1
+ }
+
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {