blob: 5c720d0ee5d38cf334aee62f2de0931ba1af547a [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov3754f952017-01-18 20:47:54 +03005package kotlinx.coroutines.experimental
6
Roman Elizarovaa461cf2018-04-11 13:20:29 +03007import kotlinx.coroutines.experimental.timeunit.TimeUnit
Roman Elizarov1a016bd2017-07-12 11:41:34 +03008import java.util.concurrent.*
Vsevolod Tolstopyatov87f2faa2018-04-30 22:53:02 +03009import java.util.concurrent.atomic.*
10import kotlin.coroutines.experimental.*
Roman Elizarov3754f952017-01-18 20:47:54 +030011
12/**
Roman Elizaroved7b8642017-01-19 11:22:28 +030013 * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
Nikolay Metchevc1c380c2018-02-11 13:13:08 +000014 *
15 * If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements
16 * efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even
17 * when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar
18 * shared pool where not.
19 *
20 * If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread
21 * pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform
22 * privileged actions.
Roman Elizarov3754f952017-01-18 20:47:54 +030023 */
Roman Elizarov67891d82017-01-23 16:47:20 +030024object CommonPool : CoroutineDispatcher() {
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030025
26 /**
27 * Name of the property that controls default parallelism level of [CommonPool].
28 * If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM).
29 * Note that until Java 10, if an application is run within a container,
paolope06b6ca2018-06-09 09:48:44 +000030 * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores.
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030031 */
32 public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
33
34 private val parallelism = run<Int> {
35 val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) }
36 if (property == null) {
37 (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
38 } else {
39 val parallelism = property.toIntOrNull()
40 if (parallelism == null || parallelism < 1) {
41 error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
42 }
43 parallelism
44 }
45 }
46
47 // For debug and tests
Roman Elizarov731f0ad2017-02-22 20:48:45 +030048 private var usePrivatePool = false
49
50 @Volatile
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030051 private var pool: Executor? = null
Roman Elizarov3754f952017-01-18 20:47:54 +030052
53 private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
54
Roman Elizarov731f0ad2017-02-22 20:48:45 +030055 private fun createPool(): ExecutorService {
Nikolay Metchevc1c380c2018-02-11 13:13:08 +000056 if (System.getSecurityManager() != null) return createPlainPool()
Roman Elizarov3754f952017-01-18 20:47:54 +030057 val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
58 ?: return createPlainPool()
Roman Elizarov731f0ad2017-02-22 20:48:45 +030059 if (!usePrivatePool) {
60 Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
61 ?.let { return it }
62 }
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030063 Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
Roman Elizarov3754f952017-01-18 20:47:54 +030064 ?. let { return it }
65 return createPlainPool()
66 }
67
Roman Elizarov7cf452e2017-01-29 21:58:33 +030068 private fun createPlainPool(): ExecutorService {
Roman Elizarov3754f952017-01-18 20:47:54 +030069 val threadId = AtomicInteger()
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030070 return Executors.newFixedThreadPool(parallelism) {
Roman Elizarov3754f952017-01-18 20:47:54 +030071 Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
72 }
73 }
74
Roman Elizarov731f0ad2017-02-22 20:48:45 +030075 @Synchronized
Roman Elizarov1a016bd2017-07-12 11:41:34 +030076 private fun getOrCreatePoolSync(): Executor =
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030077 pool ?: createPool().also { pool = it }
Roman Elizarov731f0ad2017-02-22 20:48:45 +030078
79 override fun dispatch(context: CoroutineContext, block: Runnable) =
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030080 try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
Roman Elizarov35d2c342017-07-20 14:54:39 +030081 catch (e: RejectedExecutionException) {
82 timeSource.unTrackTask()
83 DefaultExecutor.execute(block)
84 }
Roman Elizarov731f0ad2017-02-22 20:48:45 +030085
86 // used for tests
87 @Synchronized
88 internal fun usePrivatePool() {
Roman Elizarov1a016bd2017-07-12 11:41:34 +030089 shutdown(0)
Roman Elizarov731f0ad2017-02-22 20:48:45 +030090 usePrivatePool = true
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030091 pool = null
Roman Elizarov731f0ad2017-02-22 20:48:45 +030092 }
93
94 // used for tests
95 @Synchronized
Roman Elizarov1a016bd2017-07-12 11:41:34 +030096 internal fun shutdown(timeout: Long) {
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +030097 (pool as? ExecutorService)?.apply {
Roman Elizarov731f0ad2017-02-22 20:48:45 +030098 shutdown()
99 if (timeout > 0)
100 awaitTermination(timeout, TimeUnit.MILLISECONDS)
Roman Elizarov35d2c342017-07-20 14:54:39 +0300101 shutdownNow().forEach { DefaultExecutor.execute(it) }
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300102 }
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300103 pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
Roman Elizarov1a016bd2017-07-12 11:41:34 +0300104 }
105
106 // used for tests
107 @Synchronized
108 internal fun restore() {
109 shutdown(0)
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300110 usePrivatePool = false
Vsevolod Tolstopyatov6a0ce762018-05-22 19:47:50 +0300111 pool = null
Roman Elizarov731f0ad2017-02-22 20:48:45 +0300112 }
Roman Elizarovdc9fd1c2017-04-07 10:35:28 +0300113
114 override fun toString(): String = "CommonPool"
Roman Elizarov3754f952017-01-18 20:47:54 +0300115}