Removed wrapper from LimitingBlockingDispatcher by introducing TaskContext
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 6aef8c8..6e94560 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -322,14 +322,14 @@
      * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
      *
      * @param block runnable to be dispatched
-     * @param mode mode of given [block] which is used as a hint to a dynamic resizing mechanism
+     * @param taskContext concurrency context of given [block]
      * @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
      */
-    fun dispatch(block: Runnable, mode: TaskMode = TaskMode.NON_BLOCKING, fair: Boolean = false) {
+    fun dispatch(block: Runnable, taskContext: TaskContext? = null, fair: Boolean = false) {
         // TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
-        val task = Task(block, schedulerTimeSource.nanoTime(), mode)
+        val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
         // try to submit the task to the local queue and act depending on the result
-        when (submitToLocalQueue(task, mode, fair)) {
+        when (submitToLocalQueue(task, fair)) {
             ADDED -> return
             NOT_ADDED -> {
                 globalQueue.addLast(task) // offload task to local queue
@@ -452,11 +452,11 @@
     /**
      * Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
      */
-    private fun submitToLocalQueue(task: Task, mode: TaskMode, fair: Boolean): Int {
+    private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
         val worker = Thread.currentThread() as? Worker ?: return NOT_ADDED
         var result = ADDED
 
-        if (mode == TaskMode.NON_BLOCKING) {
+        if (task.mode == TaskMode.NON_BLOCKING) {
             /*
              * If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
              * 1) Blocking worker is finishing its block and resumes non-blocking continuation
@@ -685,23 +685,23 @@
                         wasIdle = false
                     }
                     beforeTask(task)
-                    runSafely(task.block)
+                    runSafely(task)
                     afterTask(task)
                 }
             }
             tryReleaseCpu(WorkerState.TERMINATED)
         }
 
-        private fun runSafely(block: Runnable) {
+        private fun runSafely(task: Task) {
             try {
-                block.run()
+                task.run()
             } catch (t: Throwable) {
                 uncaughtExceptionHandler.uncaughtException(this, t)
             }
         }
 
-        private fun beforeTask(job: Task) {
-            if (job.mode != TaskMode.NON_BLOCKING) {
+        private fun beforeTask(task: Task) {
+            if (task.mode != TaskMode.NON_BLOCKING) {
                 /*
                  * We should release CPU *before* checking for CPU starvation,
                  * otherwise requestCpuWorker() will not count current thread as blocking
@@ -720,7 +720,7 @@
                 return
             }
             val now = schedulerTimeSource.nanoTime()
-            if (now - job.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
+            if (now - task.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS &&
                 now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5
             ) {
                 lastExhaustionTime = now
@@ -729,8 +729,8 @@
         }
 
 
-        private fun afterTask(job: Task) {
-            if (job.mode != TaskMode.NON_BLOCKING) {
+        private fun afterTask(task: Task) {
+            if (task.mode != TaskMode.NON_BLOCKING) {
                 decrementBlockingWorkers()
                 assert(state == WorkerState.BLOCKING) { "Expected BLOCKING state, but has $state" }
                 state = WorkerState.RETIRING
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
index e866d08..6a690d2 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -16,9 +16,11 @@
 ) : CoroutineDispatcher(), Delay, Closeable {
     private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
 
-    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
+    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
+        coroutineScheduler.dispatch(block)
 
-    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
+    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
+        coroutineScheduler.dispatch(block, fair = true)
 
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
             DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
@@ -38,17 +40,18 @@
      */
     fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
         require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
-        return LimitingBlockingDispatcher(parallelism, TaskMode.PROBABLY_BLOCKING, this)
+        return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
     }
 
-    internal fun dispatchBlocking(block: Runnable, context: TaskMode, fair: Boolean): Unit = coroutineScheduler.dispatch(block, context, fair)
+    internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
+        coroutineScheduler.dispatch(block, context, fair)
 }
 
 private class LimitingBlockingDispatcher(
+    val dispatcher: ExperimentalCoroutineDispatcher,
     val parallelism: Int,
-    val taskContext: TaskMode,
-    val dispatcher: ExperimentalCoroutineDispatcher
-) : CoroutineDispatcher(), Delay {
+    override val taskMode: TaskMode
+) : CoroutineDispatcher(), Delay, TaskContext {
 
     private val queue = ConcurrentLinkedQueue<Runnable>()
     private val inFlightTasks = atomic(0)
@@ -56,14 +59,14 @@
     override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
 
     private fun dispatch(block: Runnable, fair: Boolean) {
-        var taskToSchedule = wrap(block)
+        var taskToSchedule = block
         while (true) {
             // Commit in-flight tasks slot
             val inFlight = inFlightTasks.incrementAndGet()
 
             // Fast path, if parallelism limit is not reached, dispatch task and return
             if (inFlight <= parallelism) {
-                dispatcher.dispatchBlocking(taskToSchedule, taskContext, fair)
+                dispatcher.dispatchWithContext(taskToSchedule, this, fair)
                 return
             }
 
@@ -97,10 +100,6 @@
         return "${super.toString()}[dispatcher = $dispatcher]"
     }
 
-    private fun wrap(block: Runnable): Runnable {
-        return block as? WrappedTask ?: WrappedTask(block)
-    }
-
     /**
      * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
      *
@@ -114,11 +113,11 @@
      * ```
      * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
      */
-    private fun afterTask() {
+    override fun afterTask() {
         var next = queue.poll()
         // If we have pending tasks in current blocking context, dispatch first
         if (next != null) {
-            dispatcher.dispatchBlocking(next, taskContext, true)
+            dispatcher.dispatchWithContext(next, this, true)
             return
         }
         inFlightTasks.decrementAndGet()
@@ -138,16 +137,6 @@
         dispatch(next, true)
     }
 
-    private inner class WrappedTask(val runnable: Runnable) : Runnable {
-        override fun run() {
-            try {
-                runnable.run()
-            } finally {
-                afterTask()
-            }
-        }
-    }
-
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) =
         dispatcher.scheduleResumeAfterDelay(time, unit, continuation)
 }
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
index 561a3bb..2505986 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
@@ -43,13 +43,32 @@
     PROBABLY_BLOCKING,
 }
 
+internal interface TaskContext {
+    val taskMode: TaskMode
+    fun afterTask()
+}
+
 internal class Task(
-    val block: Runnable,
-    val submissionTime: Long,
-    val mode: TaskMode
-) : LockFreeMPMCQueueNode<Task>() {
+    @JvmField val block: Runnable,
+    @JvmField val submissionTime: Long,
+    @JvmField val taskContext: TaskContext?
+) : Runnable, LockFreeMPMCQueueNode<Task>() {
+    val mode: TaskMode get() = taskContext?.taskMode ?: TaskMode.NON_BLOCKING
+    
+    override fun run() {
+        if (taskContext == null) {
+            block.run()
+        } else {
+            try {
+                block.run()
+            } finally {
+                taskContext.afterTask()
+            }
+        }
+    }
+
     override fun toString(): String =
-        "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $mode]"
+        "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
 }
 
 // Open for tests
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
index 4cff7d2..e11d597 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt
@@ -9,11 +9,11 @@
     @Test
     fun testModesExternalSubmission() { // Smoke
         CoroutineScheduler(1, 1).use {
-            for (value in TaskMode.values()) {
+            for (mode in TaskMode.values()) {
                 val latch = CountDownLatch(1)
                 it.dispatch(Runnable {
                     latch.countDown()
-                }, mode = value)
+                }, TaskContextImpl(mode))
 
                 latch.await()
             }
@@ -25,10 +25,10 @@
         CoroutineScheduler(2, 2).use {
             val latch = CountDownLatch(1)
             it.dispatch(Runnable {
-                for (value in TaskMode.values()) {
+                for (mode in TaskMode.values()) {
                     it.dispatch(Runnable {
                         latch.countDown()
-                    }, mode = value)
+                    }, TaskContextImpl(mode))
                 }
             })
 
@@ -126,4 +126,8 @@
             check(ratio >= 0.9)
         }
     }
+
+    private class TaskContextImpl(override val taskMode: TaskMode) : TaskContext {
+        override fun afterTask() {}
+    }
 }
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
index 2c9fd55..ef371e9 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
@@ -126,7 +126,7 @@
     return result
 }
 
-internal fun task(n: Long) = Task(Runnable {}, n, TaskMode.NON_BLOCKING)
+internal fun task(n: Long) = Task(Runnable {}, n, null)
 
 internal fun WorkQueue.drain(): List<Long> {
     var task: Task? = poll()