Removed wrapper from LimitingBlockingDispatcher by introducing TaskContext
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 6aef8c8..6e94560 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -322,14 +322,14 @@
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
*
* @param block runnable to be dispatched
- * @param mode mode of given [block] which is used as a hint to a dynamic resizing mechanism
+ * @param taskContext concurrency context of given [block]
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
*/
- fun dispatch(block: Runnable, mode: TaskMode = TaskMode.NON_BLOCKING, fair: Boolean = false) {
+ fun dispatch(block: Runnable, taskContext: TaskContext? = null, fair: Boolean = false) {
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
- val task = Task(block, schedulerTimeSource.nanoTime(), mode)
+ val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
// try to submit the task to the local queue and act depending on the result
- when (submitToLocalQueue(task, mode, fair)) {
+ when (submitToLocalQueue(task, fair)) {
ADDED -> return
NOT_ADDED -> {
globalQueue.addLast(task) // offload task to local queue
@@ -452,11 +452,11 @@
/**
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
*/
- private fun submitToLocalQueue(task: Task, mode: TaskMode, fair: Boolean): Int {
+ private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
val worker = Thread.currentThread() as? Worker ?: return NOT_ADDED
var result = ADDED
- if (mode == TaskMode.NON_BLOCKING) {
+ if (task.mode == TaskMode.NON_BLOCKING) {
/*
* If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
* 1) Blocking worker is finishing its block and resumes non-blocking continuation
@@ -685,23 +685,23 @@
wasIdle = false
}
beforeTask(task)
- runSafely(task.block)
+ runSafely(task)
afterTask(task)
}
}
tryReleaseCpu(WorkerState.TERMINATED)
}
- private fun runSafely(block: Runnable) {
+ private fun runSafely(task: Task) {
try {
- block.run()
+ task.run()
} catch (t: Throwable) {
uncaughtExceptionHandler.uncaughtException(this, t)
}
}
- private fun beforeTask(job: Task) {
- if (job.mode != TaskMode.NON_BLOCKING) {
+ private fun beforeTask(task: Task) {
+ if (task.mode != TaskMode.NON_BLOCKING) {
/*
* We should release CPU *before* checking for CPU starvation,
* otherwise requestCpuWorker() will not count current thread as blocking
@@ -720,7 +720,7 @@
return
}
val now = schedulerTimeSource.nanoTime()
- if (now - job.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
+ if (now - task.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5
) {
lastExhaustionTime = now
@@ -729,8 +729,8 @@
}
- private fun afterTask(job: Task) {
- if (job.mode != TaskMode.NON_BLOCKING) {
+ private fun afterTask(task: Task) {
+ if (task.mode != TaskMode.NON_BLOCKING) {
decrementBlockingWorkers()
assert(state == WorkerState.BLOCKING) { "Expected BLOCKING state, but has $state" }
state = WorkerState.RETIRING
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
index e866d08..6a690d2 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -16,9 +16,11 @@
) : CoroutineDispatcher(), Delay, Closeable {
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
- override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
+ override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
+ coroutineScheduler.dispatch(block)
- override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
+ override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
+ coroutineScheduler.dispatch(block, fair = true)
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
@@ -38,17 +40,18 @@
*/
fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
- return LimitingBlockingDispatcher(parallelism, TaskMode.PROBABLY_BLOCKING, this)
+ return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
}
- internal fun dispatchBlocking(block: Runnable, context: TaskMode, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair)
+ internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
+ coroutineScheduler.dispatch(block, context, fair)
}
private class LimitingBlockingDispatcher(
+ val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
- val taskContext: TaskMode,
- val dispatcher: ExperimentalCoroutineDispatcher
-) : CoroutineDispatcher(), Delay {
+ override val taskMode: TaskMode
+) : CoroutineDispatcher(), Delay, TaskContext {
private val queue = ConcurrentLinkedQueue<Runnable>()
private val inFlightTasks = atomic(0)
@@ -56,14 +59,14 @@
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, fair: Boolean) {
- var taskToSchedule = wrap(block)
+ var taskToSchedule = block
while (true) {
// Commit in-flight tasks slot
val inFlight = inFlightTasks.incrementAndGet()
// Fast path, if parallelism limit is not reached, dispatch task and return
if (inFlight <= parallelism) {
- dispatcher.dispatchBlocking(taskToSchedule, taskContext, fair)
+ dispatcher.dispatchWithContext(taskToSchedule, this, fair)
return
}
@@ -97,10 +100,6 @@
return "${super.toString()}[dispatcher = $dispatcher]"
}
- private fun wrap(block: Runnable): Runnable {
- return block as? WrappedTask ?: WrappedTask(block)
- }
-
/**
* Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
*
@@ -114,11 +113,11 @@
* ```
* it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
*/
- private fun afterTask() {
+ override fun afterTask() {
var next = queue.poll()
// If we have pending tasks in current blocking context, dispatch first
if (next != null) {
- dispatcher.dispatchBlocking(next, taskContext, true)
+ dispatcher.dispatchWithContext(next, this, true)
return
}
inFlightTasks.decrementAndGet()
@@ -138,16 +137,6 @@
dispatch(next, true)
}
- private inner class WrappedTask(val runnable: Runnable) : Runnable {
- override fun run() {
- try {
- runnable.run()
- } finally {
- afterTask()
- }
- }
- }
-
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
index 561a3bb..2505986 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
@@ -43,13 +43,32 @@
PROBABLY_BLOCKING,
}
+internal interface TaskContext {
+ val taskMode: TaskMode
+ fun afterTask()
+}
+
internal class Task(
- val block: Runnable,
- val submissionTime: Long,
- val mode: TaskMode
-) : LockFreeMPMCQueueNode<Task>() {
+ @JvmField val block: Runnable,
+ @JvmField val submissionTime: Long,
+ @JvmField val taskContext: TaskContext?
+) : Runnable, LockFreeMPMCQueueNode<Task>() {
+ val mode: TaskMode get() = taskContext?.taskMode ?: TaskMode.NON_BLOCKING
+
+ override fun run() {
+ if (taskContext == null) {
+ block.run()
+ } else {
+ try {
+ block.run()
+ } finally {
+ taskContext.afterTask()
+ }
+ }
+ }
+
override fun toString(): String =
- "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $mode]"
+ "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
}
// Open for tests
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
index 4cff7d2..e11d597 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
@@ -9,11 +9,11 @@
@Test
fun testModesExternalSubmission() { // Smoke
CoroutineScheduler(1, 1).use {
- for (value in TaskMode.values()) {
+ for (mode in TaskMode.values()) {
val latch = CountDownLatch(1)
it.dispatch(Runnable {
latch.countDown()
- }, mode = value)
+ }, TaskContextImpl(mode))
latch.await()
}
@@ -25,10 +25,10 @@
CoroutineScheduler(2, 2).use {
val latch = CountDownLatch(1)
it.dispatch(Runnable {
- for (value in TaskMode.values()) {
+ for (mode in TaskMode.values()) {
it.dispatch(Runnable {
latch.countDown()
- }, mode = value)
+ }, TaskContextImpl(mode))
}
})
@@ -126,4 +126,8 @@
check(ratio >= 0.9)
}
}
+
+ private class TaskContextImpl(override val taskMode: TaskMode) : TaskContext {
+ override fun afterTask() {}
+ }
}
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
index 2c9fd55..ef371e9 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
@@ -126,7 +126,7 @@
return result
}
-internal fun task(n: Long) = Task(Runnable {}, n, TaskMode.NON_BLOCKING)
+internal fun task(n: Long) = Task(Runnable {}, n, null)
internal fun WorkQueue.drain(): List<Long> {
var task: Task? = poll()