Introduce IO dispatcher to offload blocking I/O-intensive tasks
Fixes #79
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index 500d3de..152afc3 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -132,7 +132,9 @@
public static final field DEBUG_PROPERTY_VALUE_AUTO Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_OFF Ljava/lang/String;
public static final field DEBUG_PROPERTY_VALUE_ON Ljava/lang/String;
+ public static final field IO_PARALLELISM_PROPERTY_NAME Ljava/lang/String;
public static final fun getDefaultDispatcher ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
+ public static final fun getIO ()Lkotlinx/coroutines/experimental/CoroutineDispatcher;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
diff --git a/common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt b/common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt
index 136efd5..59ef413 100644
--- a/common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt
+++ b/common/kotlinx-coroutines-core-common/src/CoroutineDispatcher.kt
@@ -10,16 +10,19 @@
* Base class that shall be extended by all coroutine dispatcher implementations.
*
* The following standard implementations are provided by `kotlinx.coroutines`:
+ *
+ * * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
+ * is specified in their context. It is currently equal to [CommonPool] (subject to change in the future).
+ * This is an appropriate choice for compute-intensive coroutines that consume CPU resources.
+ * * [CommonPool] -- schedules coroutine execution to a common pool of shared background threads designed
+ * to be used for compute-intensive code.
+ * * [IO] -- uses a shared pool of on-demand created threads and is designed for offloading of IO-intensive _blocking_
+ * operations (like file I/O and blocking socket I/O).
* * [Unconfined] -- starts coroutine execution in the current call-frame until the first suspension.
* On first suspension the coroutine builder function returns.
- * The coroutine will resume in whatever thread that is used by the
+ * The coroutine resumes in whatever thread that is used by the
* corresponding suspending function, without confining it to any specific thread or pool.
- * This in an appropriate choice for IO-intensive coroutines that do not consume CPU resources.
- * * [DefaultDispatcher] -- is used by all standard builder if no dispatcher nor any other [ContinuationInterceptor]
- * is specified in their context. It is currently equal to [CommonPool] (subject to change).
- * * [CommonPool] -- immediately returns from the coroutine builder and schedules coroutine execution to
- * a common pool of shared background threads.
- * This is an appropriate choice for compute-intensive coroutines that consume a lot of CPU resources.
+ * **Unconfined dispatcher should not be normally used in code**.
* * Private thread pools can be created with [newSingleThreadContext] and [newFixedThreadPoolContext].
* * An arbitrary [Executor][java.util.concurrent.Executor] can be converted to dispatcher with [asCoroutineDispatcher] extension function.
*
diff --git a/core/kotlinx-coroutines-core/src/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/CoroutineContext.kt
index 2fcd014..fd6ced5 100644
--- a/core/kotlinx-coroutines-core/src/CoroutineContext.kt
+++ b/core/kotlinx-coroutines-core/src/CoroutineContext.kt
@@ -10,7 +10,7 @@
import kotlin.coroutines.experimental.*
/**
- * Name of the property that control coroutine debugging. See [newCoroutineContext].
+ * Name of the property that controls coroutine debugging. See [newCoroutineContext].
*/
public const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
@@ -56,14 +56,34 @@
}
/**
- * This is the default [CoroutineDispatcher] that is used by all standard builders like
+ * The default [CoroutineDispatcher] that is used by all standard builders like
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is currently equal to [CommonPool], but the value is subject to change in the future.
+ * You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
+ * to use an experimental coroutine dispatcher that shares threads with [IO] dispatcher and thus can switch to
+ * [IO] context without performing an actual thread context switch.
*/
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher =
- if (useCoroutinesScheduler) ExperimentalCoroutineDispatcher() else CommonPool
+ if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool
+
+/**
+ * Name of the property that defines the maximal number of threads that are used by [IO] coroutines dispatcher.
+ */
+public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.parallelism"
+
+/**
+ * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
+ *
+ * Additional threads in this pool are created and are shutdown on demand.
+ * The number of threads used by this dispatcher is limited by the value of
+ * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
+ * It defaults to the limit of 64 threads or the number of cores (whichever is larger).
+ */
+public val IO by lazy {
+ BackgroundDispatcher.blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
+}
/**
* Creates context for the new coroutine. It installs [DefaultDispatcher] when no other dispatcher nor
diff --git a/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt
index 39d8a7c..4845136 100644
--- a/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -10,10 +10,15 @@
import kotlin.coroutines.experimental.*
/**
+ * Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
+ */
+internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher()
+
+/**
* @suppress **This is unstable API and it is subject to change.**
*/
// TODO make internal (and rename) after complete integration
-class ExperimentalCoroutineDispatcher(
+open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long
diff --git a/core/kotlinx-coroutines-core/test/IODispatcherTest.kt b/core/kotlinx-coroutines-core/test/IODispatcherTest.kt
new file mode 100644
index 0000000..71d9346
--- /dev/null
+++ b/core/kotlinx-coroutines-core/test/IODispatcherTest.kt
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+import kotlinx.coroutines.experimental.*
+import org.junit.Test
+import kotlin.test.*
+
+class IODispatcherTest : TestBase() {
+ @Test
+ fun testWithIOContext() = runTest {
+ // just a very basic test that is dispatcher works and indeed uses background thread
+ val mainThread = Thread.currentThread()
+ expect(1)
+ withContext(IO) {
+ expect(2)
+ assertNotSame(mainThread, Thread.currentThread())
+ }
+ expect(3)
+ assertSame(mainThread, Thread.currentThread())
+ finish(4)
+ }
+}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt
index aa5a1e6..79d66e4 100644
--- a/core/kotlinx-coroutines-core/test/TestBase.kt
+++ b/core/kotlinx-coroutines-core/test/TestBase.kt
@@ -117,12 +117,12 @@
fun initPoolsBeforeTest() {
CommonPool.usePrivatePool()
- if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).usePrivateScheduler()
+ BackgroundDispatcher.usePrivateScheduler()
}
fun shutdownPoolsAfterTest() {
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
- if (useCoroutinesScheduler) (DefaultDispatcher as ExperimentalCoroutineDispatcher).shutdown(SHUTDOWN_TIMEOUT)
+ BackgroundDispatcher.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
}