Added context parameter to CoroutineDispatcher methods, implemented Executor.toCoroutineDispatcher
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
index 36d3cbf..d91ac74 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
@@ -4,6 +4,7 @@
import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.CoroutineContext
/**
* Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
@@ -11,7 +12,7 @@
* coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
* When available, it wraps [ForkJoinPool.commonPool] and provides a similar shared pool where not.
*/
-object CommonPool : CoroutineDispatcher(), Yield {
+object CommonPool : CoroutineDispatcher() {
private val pool: Executor = findPool()
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
@@ -35,10 +36,6 @@
private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
- override fun isDispatchNeeded(): Boolean = true
- override fun dispatch(block: Runnable) = pool.execute(block)
-
- override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
- pool.execute { continuation.resume(Unit) }
- }
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+ override fun dispatch(context: CoroutineContext, block: Runnable) = pool.execute(block)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 055f6f3..78d810c 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -3,6 +3,7 @@
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.Continuation
import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
/**
* Base class that shall be extended by all coroutine dispatcher implementations.
@@ -30,12 +31,12 @@
/**
* Return `true` if execution shall be dispatched onto another thread.
*/
- public abstract fun isDispatchNeeded(): Boolean
+ public abstract fun isDispatchNeeded(context: CoroutineContext): Boolean
/**
- * Dispatches execution of a runnable [block] onto another thread.
+ * Dispatches execution of a runnable [block] onto another thread in the given [context].
*/
- public abstract fun dispatch(block: Runnable)
+ public abstract fun dispatch(context: CoroutineContext, block: Runnable)
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation<T>(this, continuation)
@@ -46,27 +47,29 @@
val continuation: Continuation<T>
): Continuation<T> by continuation {
override fun resume(value: T) {
- if (dispatcher.isDispatchNeeded())
- dispatcher.dispatch(Runnable {
- withDefaultCoroutineContext(continuation.context) {
+ val context = continuation.context
+ if (dispatcher.isDispatchNeeded(context))
+ dispatcher.dispatch(context, Runnable {
+ withDefaultCoroutineContext(context) {
continuation.resume(value)
}
})
else
- withDefaultCoroutineContext(continuation.context) {
+ withDefaultCoroutineContext(context) {
continuation.resume(value)
}
}
override fun resumeWithException(exception: Throwable) {
- if (dispatcher.isDispatchNeeded())
- dispatcher.dispatch(Runnable {
- withDefaultCoroutineContext(continuation.context) {
+ val context = continuation.context
+ if (dispatcher.isDispatchNeeded(context))
+ dispatcher.dispatch(context, Runnable {
+ withDefaultCoroutineContext(context) {
continuation.resumeWithException(exception)
}
})
else
- withDefaultCoroutineContext(continuation.context) {
+ withDefaultCoroutineContext(context) {
continuation.resumeWithException(exception)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
index 0e89f94..2de613d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
@@ -19,8 +19,8 @@
* mandating any specific threading policy.
*/
public object Here : CoroutineDispatcher() {
- override fun isDispatchNeeded(): Boolean = false
- override fun dispatch(block: Runnable) { throw UnsupportedOperationException() }
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
+ override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
}
/**
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index bd0a1f4..653c9bd 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -43,7 +43,6 @@
scheduleResumeAfterDelay(time, unit, cont)
return@sc
}
- val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)
- cont.cancelFutureOnCompletion(timeout)
+ scheduledExecutor.scheduleResumeAfterDelay(time, unit, cont)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 66b4cba..ce35930 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -4,6 +4,7 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.locks.LockSupport
import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
/**
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -49,9 +50,9 @@
this.parentJob = coroutine
}
- override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = Thread.currentThread() != thread
- override fun dispatch(block: Runnable) {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
schedule(Dispatch(block))
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
new file mode 100644
index 0000000..6aea7d1
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -0,0 +1,32 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.Executor
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
+ */
+public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
+ ExecutorCoroutineDispatcher(this)
+
+internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+ override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
+
+ override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ (executor as? ScheduledExecutorService ?: scheduledExecutor).scheduleResumeAfterDelay(time, unit, continuation)
+ }
+}
+
+internal fun ExecutorService.scheduleResume(cont: CancellableContinuation<Unit>) {
+ val future = submit { cont.resume(Unit) }
+ cont.cancelFutureOnCompletion(future)
+}
+
+internal fun ScheduledExecutorService.scheduleResumeAfterDelay(time: Long, unit: TimeUnit, cont: CancellableContinuation<Unit>) {
+ val timeout = schedule({ cont.resume(Unit) }, time, unit)
+ cont.cancelFutureOnCompletion(timeout)
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index b456c12..dbfab01 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -5,11 +5,10 @@
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
-import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
/**
- * Creates new coroutine execution context with the a single thread and built-in [delay] support.
+ * Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
* All continuations are dispatched immediately when invoked inside the thread of this context.
* Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
* The specified [name] defines the name of the new thread.
@@ -19,7 +18,7 @@
newFixedThreadPoolContext(1, name, parent)
/**
- * Creates new coroutine execution context with the fixed-size thread-pool and built-in [delay] support.
+ * Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
* All continuations are dispatched immediately when invoked inside the threads of this context.
* Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
* The specified [name] defines the names of the threads.
@@ -27,8 +26,8 @@
*/
fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
- val lifetime = Job(parent)
- return lifetime + ThreadPoolDispatcher(nThreads, name, lifetime)
+ val job = Job(parent)
+ return job + ThreadPoolDispatcher(nThreads, name, job)
}
private val thisThreadContext = ThreadLocal<ThreadPoolDispatcher>()
@@ -37,7 +36,7 @@
nThreads: Int,
name: String,
val job: Job
-) : CoroutineDispatcher(), ContinuationInterceptor, Yield, Delay {
+) : CoroutineDispatcher(), Yield, Delay {
val threadNo = AtomicInteger()
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
thread(start = false, isDaemon = true,
@@ -51,16 +50,15 @@
job.onCompletion { executor.shutdown() }
}
- override fun isDispatchNeeded(): Boolean = thisThreadContext.get() != this
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = thisThreadContext.get() != this
- override fun dispatch(block: Runnable) = executor.execute(block)
+ override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
- executor.execute { continuation.resume(Unit) }
+ executor.scheduleResume(continuation)
}
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
- val timeout = executor.schedule({ continuation.resume(Unit) }, time, unit)
- continuation.cancelFutureOnCompletion(timeout)
+ executor.scheduleResumeAfterDelay(time, unit, continuation)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
index 9623de3..803c9c9 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -4,7 +4,8 @@
/**
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that
- * natively support [yield] function.
+ * natively support [yield] function. It shall be implemented only by [CoroutineDispatcher]
+ * classes with non-trivial [CoroutineDispatcher.isDispatchNeeded] implementations.
*/
public interface Yield {
/**
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
new file mode 100644
index 0000000..37414d6
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
@@ -0,0 +1,85 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.Executors
+
+class ExecutorsTest {
+ fun threadNames(): Set<String> {
+ val arrayOfThreads = Array<Thread?>(Thread.activeCount()) { null }
+ val n = Thread.enumerate(arrayOfThreads)
+ val names = hashSetOf<String>()
+ for (i in 0 until n)
+ names.add(arrayOfThreads[i]!!.name)
+ return names
+ }
+
+ lateinit var threadNamesBefore: Set<String>
+
+ @Before
+ fun before() {
+ threadNamesBefore = threadNames()
+ }
+
+ @After
+ fun after() {
+ // give threads some time to shutdown
+ val waitTill = System.currentTimeMillis() + 1000L
+ var diff: Set<String>
+ do {
+ val threadNamesAfter = threadNames()
+ diff = threadNamesAfter - threadNamesBefore
+ if (diff.isEmpty()) break
+ } while (System.currentTimeMillis() <= waitTill)
+ diff.forEach { println("Lost thread '$it'") }
+ check(diff.isEmpty()) { "Lost ${diff.size} threads"}
+ }
+
+ fun checkThreadName(prefix: String) {
+ val name = Thread.currentThread().name
+ check(name.startsWith(prefix)) { "Expected thread name to start with '$prefix', found: '$name'" }
+ }
+
+ @Test
+ fun testSingleThread() {
+ val context = newSingleThreadContext("TestThread")
+ runBlocking(context) {
+ checkThreadName("TestThread")
+ }
+ context[Job]!!.cancel()
+ }
+
+ @Test
+ fun testFixedThreadPool() {
+ val context = newFixedThreadPoolContext(2, "TestPool")
+ runBlocking(context) {
+ checkThreadName("TestPool")
+ }
+ context[Job]!!.cancel()
+ }
+
+ @Test
+ fun testToExecutor() {
+ val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
+ runBlocking(executor.toCoroutineDispatcher()) {
+ checkThreadName("TestExecutor")
+ }
+ executor.shutdown()
+ }
+
+ @Test
+ fun testTwoThreads() {
+ val ctx1 = newSingleThreadContext("Ctx1")
+ val ctx2 = newSingleThreadContext("Ctx2")
+ runBlocking(ctx1) {
+ checkThreadName("Ctx1")
+ run(ctx2) {
+ checkThreadName("Ctx2")
+ }
+ checkThreadName("Ctx1")
+ }
+ ctx1[Job]!!.cancel()
+ ctx2[Job]!!.cancel()
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
index ce26153..f12a5dc 100644
--- a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
+++ b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
@@ -11,6 +11,7 @@
import kotlinx.coroutines.experimental.javafx.JavaFx.delay
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
/**
@@ -21,8 +22,8 @@
PulseTimer().apply { start() }
}
- override fun isDispatchNeeded(): Boolean = !Platform.isFxApplicationThread()
- override fun dispatch(block: Runnable) = Platform.runLater(block)
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = !Platform.isFxApplicationThread()
+ override fun dispatch(context: CoroutineContext, block: Runnable) = Platform.runLater(block)
/**
* Suspends coroutine until next JavaFx pulse and returns time of the pulse on resumption.
diff --git a/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt b/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
index f6694ec..30a99ca 100644
--- a/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
+++ b/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
@@ -7,6 +7,7 @@
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.CoroutineContext
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@@ -123,8 +124,8 @@
}
private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
- override fun isDispatchNeeded(): Boolean = true
- override fun dispatch(block: Runnable) {
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
wrapper {
block.run()
}
diff --git a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
index 8bcea9d..42ebe22 100644
--- a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
+++ b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
@@ -9,13 +9,14 @@
import java.util.concurrent.TimeUnit
import javax.swing.SwingUtilities
import javax.swing.Timer
+import kotlin.coroutines.CoroutineContext
/**
* Dispatches execution onto Swing event dispatching thread and provides native [delay] support.
*/
object Swing : CoroutineDispatcher(), Yield, Delay {
- override fun isDispatchNeeded(): Boolean = !SwingUtilities.isEventDispatchThread()
- override fun dispatch(block: Runnable) = SwingUtilities.invokeLater(block)
+ override fun isDispatchNeeded(context: CoroutineContext): Boolean = !SwingUtilities.isEventDispatchThread()
+ override fun dispatch(context: CoroutineContext, block: Runnable) = SwingUtilities.invokeLater(block)
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
SwingUtilities.invokeLater { continuation.resume(Unit) }