No Job in newSingleThreadContext and newFixedThreadPoolContext anymore
* This resolves the common issue of using `run(ctx)` where ctx comes from
either `newSingleThreadContext` or `newFixedThreadPoolContext`
invocation. They both used to return a combination of dispatcher + job,
and this job was overriding the parent job, thus preventing propagation
of cancellation. Not anymore.
* ThreadPoolDispatcher class is now public and is the result type for
both functions. It has the `close` method to release the thread pool.
Fixes #149
Fixes #151
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
index 142002a..a55dd31 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -39,9 +39,15 @@
private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
-internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
- abstract val executor: Executor
-
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
+ /**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+ internal abstract val executor: Executor
+
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index c088b62..d9e1006 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -16,6 +16,7 @@
package kotlinx.coroutines.experimental
+import java.io.Closeable
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.atomic.AtomicInteger
@@ -23,27 +24,43 @@
/**
* Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
- * All continuations are dispatched immediately when invoked inside the thread of this context.
- * Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
- * The specified [name] defines the name of the new thread.
- * An optional [parent] job may be specified upon creation.
+ * **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its thread).
+ * Resources are reclaimed by [ThreadPoolDispatcher.close].**
+ *
+ * @param name the base name of the created thread.
*/
+fun newSingleThreadContext(name: String): ThreadPoolDispatcher =
+ newFixedThreadPoolContext(1, name)
+
+/**
+ * @suppress **Deprecated**: Parent job is no longer supported.
+ */
+@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
+ level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newSingleThreadContext(name)"))
fun newSingleThreadContext(name: String, parent: Job? = null): CoroutineContext =
- newFixedThreadPoolContext(1, name, parent)
+ newFixedThreadPoolContext(1, name)
/**
* Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
- * All continuations are dispatched immediately when invoked inside the threads of this context.
- * Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
- * The specified [name] defines the names of the threads.
- * An optional [parent] job may be specified upon creation.
+ * **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its threads).
+ * Resources are reclaimed by [ThreadPoolDispatcher.close].**
+ *
+ * @param nThreads the number of threads.
+ * @param name the base name of the created threads.
*/
-fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
+fun newFixedThreadPoolContext(nThreads: Int, name: String): ThreadPoolDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
- val job = Job(parent)
- return job + ThreadPoolDispatcher(nThreads, name, job)
+ return ThreadPoolDispatcher(nThreads, name)
}
+/**
+ * @suppress **Deprecated**: Parent job is no longer supported.
+ */
+@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
+ level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newFixedThreadPoolContext(nThreads, name)"))
+fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext =
+ newFixedThreadPoolContext(nThreads, name)
+
internal class PoolThread(
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
target: Runnable, name: String
@@ -51,19 +68,25 @@
init { isDaemon = true }
}
-internal class ThreadPoolDispatcher(
+/**
+ * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
+ * created with [newSingleThreadContext] and [newFixedThreadPoolContext].
+ */
+public class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
- private val name: String,
- job: Job
-) : ExecutorCoroutineDispatcherBase() {
+ private val name: String
+) : ExecutorCoroutineDispatcherBase(), Closeable {
private val threadNo = AtomicInteger()
- override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
+ internal override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}
- init {
- job.invokeOnCompletion { executor.shutdown() }
+ /**
+ * Closes this dispatcher -- shuts down all threads in this pool and releases resources.
+ */
+ public override fun close() {
+ executor.shutdown()
}
override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
index 228b4fc..43caf8b 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
@@ -22,13 +22,15 @@
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
- val ctx1 = newSingleThreadContext("Ctx1")
- val ctx2 = newSingleThreadContext("Ctx2")
- runBlocking(ctx1) {
- log("Started in ctx1")
- run(ctx2) {
- log("Working in ctx2")
+ newSingleThreadContext("Ctx1").use { ctx1 ->
+ newSingleThreadContext("Ctx2").use { ctx2 ->
+ runBlocking(ctx1) {
+ log("Started in ctx1")
+ run(ctx2) {
+ log("Working in ctx2")
+ }
+ log("Back to ctx1")
+ }
}
- log("Back to ctx1")
}
}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
index 71b3f35..0ae885d 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
@@ -31,7 +31,7 @@
runBlocking(context) {
checkThreadName("TestThread")
}
- context[Job]!!.cancel()
+ context.close()
}
@Test
@@ -40,7 +40,7 @@
runBlocking(context) {
checkThreadName("TestPool")
}
- context[Job]!!.cancel()
+ context.close()
}
@Test
@@ -63,7 +63,7 @@
}
checkThreadName("Ctx1")
}
- ctx1[Job]!!.cancel()
- ctx2[Job]!!.cancel()
+ ctx1.close()
+ ctx2.close()
}
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
index 8b087ca..220afbe 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -52,7 +52,7 @@
@After
fun tearDown() {
- pool.cancel()
+ pool.close()
}
@Test
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
index 0025df7..fbc55bc 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
@@ -35,7 +35,9 @@
val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
@After
- fun tearDown() { pool[Job]!!.cancel() }
+ fun tearDown() {
+ pool.close()
+ }
@Test
fun testStressClose() = runBlocking<Unit> {