Scheduler improvements:

Fix (one more) ABA problem in tryUnpark
Global TaskContext to speedup CPU tasks
Documentation improvements
Stress tests constants tuned to be more robust and faster on local machine (and when ran from IDEA)
Style fixes
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPMCQueue.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPMCQueue.kt
index c934f86..7916e6b 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPMCQueue.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPMCQueue.kt
@@ -23,6 +23,7 @@
         atomic(LockFreeMPMCQueueNode<T>() as T) // sentinel
 
     private val tail = atomic(head.value)
+    internal val headValue: T get() = head.value
 
     public fun addLast(node: T): Boolean {
         tail.loop { curTail ->
@@ -47,8 +48,7 @@
         }
     }
 
-    @PublishedApi internal val headValue: T get() = head.value
-    @PublishedApi internal fun headCas(curHead: T, update: T) = head.compareAndSet(curHead, update)
+    fun headCas(curHead: T, update: T) = head.compareAndSet(curHead, update)
 
     public inline fun removeFistOrNullIf(predicate: (T) -> Boolean): T? {
         while (true) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index a99d496..f46f603 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -9,7 +9,7 @@
 import java.util.concurrent.locks.*
 
 /**
- * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutine over worker threads,
+ * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines over worker threads,
  * including both CPU-intensive and blocking tasks.
  *
  * Current scheduler implementation has two optimization targets:
@@ -27,7 +27,7 @@
  *
  * When a coroutine is dispatched from within scheduler worker, it's placed into the head of worker run queue.
  * If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy,
- * it couples communicating coroutines into one and eliminates scheduling latency that arises from placing task in the end of the queue.
+ * it effectively couples communicating coroutines into one and eliminates scheduling latency that arises from placing task to the end of the queue.
  * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack.
  * When a coroutine is dispatched from an external thread, it's put into the global queue.
  *
@@ -70,7 +70,7 @@
             "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
         }
         require(idleWorkerKeepAliveNs > 0) {
-            "Idle worker keep alive time $idleWorkerKeepAliveNs must be postiive"
+            "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
         }
     }
 
@@ -107,18 +107,21 @@
      *
      * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
      */
-    private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int)  {
-        require(oldIndex > 0 && newIndex >= 0)
+    private fun parkedWorkersStackTopUpdate(oldIndex: Int, newIndex: Int) {
         parkedWorkersStack.loop { top ->
             val index = (top and PARKED_INDEX_MASK).toInt()
             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
             val updIndex = if (index == oldIndex) {
-                if (newIndex == 0)
+                if (newIndex == 0) {
                     parkedWorkersStackNextIndex(workers[oldIndex]!!)
-                else
+                }
+                else {
                     newIndex
-            } else
+                }
+            } else {
                 index // no change to index, but update version
+            }
+
             if (updIndex < 0) return@loop // retry
             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
         }
@@ -132,7 +135,6 @@
      * See [Worker.doPark].
      */
     private fun parkedWorkersStackPush(worker: Worker) {
-        // assert(worker.isParked)
         if (worker.nextParkedWorker !== NOT_IN_STACK) return // already in stack, bail out
         /*
          * The below loop can be entered only if this worker was not in the stack and, since no other thread
@@ -172,7 +174,7 @@
              * Successful CAS of the stack top completes successful pop.
              */
             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
-                /**
+                /*
                  * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
                  * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
                  * adding itself again. It does not matter, since we are going it invoke unpark on the thread
@@ -213,8 +215,11 @@
      * [workers] is array of lazily created workers up to [maxPoolSize] workers.
      * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
      * [blockingWorkers] is count of running workers which are executing [TaskMode.PROBABLY_BLOCKING] task.
+     * All mutations of array's content are guarded by lock.
      *
-     * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value).
+     * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
+     * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
+     * works properly
      */
     private val workers: Array<Worker?> = arrayOfNulls(maxPoolSize + 1)
 
@@ -323,7 +328,7 @@
      * @param taskContext concurrency context of given [block]
      * @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
      */
-    fun dispatch(block: Runnable, taskContext: TaskContext? = null, fair: Boolean = false) {
+    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
         // TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
         val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
         // try to submit the task to the local queue and act depending on the result
@@ -455,7 +460,6 @@
         if (worker.scheduler !== this) return NOT_ADDED // different scheduler's worker (!!!)
 
         var result = ADDED
-
         if (task.mode == TaskMode.NON_BLOCKING) {
             /*
              * If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
@@ -602,7 +606,7 @@
          * It is set to the termination deadline when started doing [blockingWorkerIdle] and it reset
          * when there is a task. It servers as protection against spurious wakeups of parkNanos.
          */
-        var terminationDeadline = 0L
+        private var terminationDeadline = 0L
 
         /**
          * Reference to the next worker in the [parkedWorkersStack].
@@ -613,17 +617,17 @@
         var nextParkedWorker: Any? = NOT_IN_STACK
 
         /**
-         * Tries to set [terminationState] to [FORBIDDEN] and returns `false` if the worker is
-         * already [TERMINATED].
+         * Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
+         * This attempt may fail either because worker terminated itself or because someone else
+         * claimed this worker (though this case is rare, because require very bad timings)
          */
-        fun tryForbidTermination(): Boolean {
-            terminationState.loop {state ->
-                when(state) {
-                    TERMINATED -> return false // already terminated
-                    FORBIDDEN -> return true // already forbidden
-                    ALLOWED -> if (terminationState.compareAndSet(ALLOWED, FORBIDDEN)) return true
-                    else -> error("Invalid terminationState = $state")
-                }
+        fun tryForbidTermination(): Boolean  {
+            val state = terminationState.value
+            return when (state) {
+                TERMINATED -> false // already terminated
+                FORBIDDEN -> false // already forbidden, someone else claimed this worker
+                ALLOWED -> terminationState.compareAndSet(ALLOWED, FORBIDDEN)
+                else -> error("Invalid terminationState = $state")
             }
         }
 
@@ -730,7 +734,6 @@
             }
         }
 
-
         private fun afterTask(task: Task) {
             if (task.mode != TaskMode.NON_BLOCKING) {
                 decrementBlockingWorkers()
@@ -852,9 +855,9 @@
         }
 
         /**
-         * Method checks whether new blocking tasks arrived to pool when worker decided
+         * Checks whether new blocking tasks arrived to the pool when worker decided
          * it can go to deep park/termination and puts recently arrived task to its local queue.
-         * Returns `true` if there is no blocking task in queue.
+         * Returns `true` if there is no blocking tasks in the queue.
          */
         private fun blockingQuiescence(): Boolean {
             globalQueue.removeFirstBlockingModeOrNull()?.let {
@@ -914,6 +917,8 @@
             val created = createdWorkers
             // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
             if (created < 2) return null
+
+            // TODO to guarantee quiescence it's probably worth to do a full scan
             var stealIndex = lastStealIndex
             if (stealIndex == 0) stealIndex = nextInt(created) // start with random steal index
             stealIndex++ // then go sequentially
@@ -930,22 +935,26 @@
     }
 
     enum class WorkerState {
-        /*
-         * Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to steal one (~in busy wait).
+        /**
+         * Has CPU token and either executes [TaskMode.NON_BLOCKING] task or tries to steal one
          */
         CPU_ACQUIRED,
+
         /**
          * Executing task with [TaskMode.PROBABLY_BLOCKING].
          */
         BLOCKING,
+
         /**
          * Currently parked.
          */
         PARKING,
+
         /**
          * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
          */
         RETIRING,
+
         /**
          * Terminal state, will no longer be used
          */
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
index 3eaa5e5..e9000ea 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -54,7 +54,7 @@
         return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
     }
 
-    internal fun dispatchWithContext(block: Runnable, context: TaskContext?, fair: Boolean): Unit =
+    internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
         coroutineScheduler.dispatch(block, context, fair)
 
     // fot tests only
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
index 9b4cd54..0b62465 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
@@ -42,9 +42,15 @@
 internal var schedulerTimeSource: TimeSource = NanoTimeSource
 
 internal enum class TaskMode {
-    // Marker indicating that task is CPU-bound and will not block
+
+    /**
+     * Marker indicating that task is CPU-bound and will not block
+     */
     NON_BLOCKING,
-    // Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
+
+    /**
+     * Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
+     */
     PROBABLY_BLOCKING,
 }
 
@@ -53,22 +59,26 @@
     fun afterTask()
 }
 
+internal object NonBlockingContext : TaskContext {
+    override val taskMode: TaskMode = TaskMode.NON_BLOCKING
+
+    override fun afterTask() {
+       // Nothing for non-blocking context
+    }
+}
+
 internal class Task(
     @JvmField val block: Runnable,
     @JvmField val submissionTime: Long,
-    @JvmField val taskContext: TaskContext?
+    @JvmField val taskContext: TaskContext
 ) : Runnable, LockFreeMPMCQueueNode<Task>() {
-    val mode: TaskMode get() = taskContext?.taskMode ?: TaskMode.NON_BLOCKING
+    val mode: TaskMode get() = taskContext.taskMode
     
     override fun run() {
-        if (taskContext == null) {
+        try {
             block.run()
-        } else {
-            try {
-                block.run()
-            } finally {
-                taskContext.afterTask()
-            }
+        } finally {
+            taskContext.afterTask()
         }
     }
 
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
index 7aa37b2..faebb1c 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
@@ -87,8 +87,8 @@
     }
 
     /**
-     * Tries stealing from [victim] queue into this queue, using [globalQueue] to offload on overflow.
-     * 
+     * Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow.
+     *
      * @return whether any task was stolen
      */
     fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean {
@@ -119,6 +119,7 @@
         if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) {
             return false
         }
+
         if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
             add(lastScheduled, globalQueue)
             return true
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt
index db1110e..b79db78 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherRaceStressTest.kt
@@ -2,15 +2,22 @@
 
 import kotlinx.coroutines.experimental.*
 import org.junit.*
+import java.util.concurrent.*
 import java.util.concurrent.atomic.*
 
 class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
     private val concurrentWorkers = AtomicInteger(0)
 
+    @Before
+    fun setUp() {
+        // In case of starvation test will hang
+        idleWorkerKeepAliveNs = Long.MAX_VALUE
+    }
+
     @Test
     fun testAddPollRace() = runBlocking {
         val limitingDispatcher = blockingDispatcher(1)
-        val iterations = 100_000 * stressTestMultiplier
+        val iterations = 25_000 * stressTestMultiplier
         // Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
         for (i in 1..iterations) {
             val tasks = (1..2).map {
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherStressTest.kt
index 7a5dd65..8304ccf 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/BlockingCoroutineDispatcherStressTest.kt
@@ -5,7 +5,7 @@
 import java.util.concurrent.*
 import java.util.concurrent.atomic.*
 
-class BlockingCoroutineDispatcherStressTest() : SchedulerTestBase() {
+class BlockingCoroutineDispatcherStressTest : SchedulerTestBase() {
 
     init {
         corePoolSize = CORES_COUNT
@@ -84,7 +84,7 @@
     @Test
     fun testBlockingTasksStarvation() = runBlocking {
         corePoolSize = 2 // Easier to reproduce race with unparks
-        val iterations = 50_000 * stressTestMultiplier
+        val iterations = 10_000 * stressTestMultiplier
         val blockingLimit = 4 // CORES_COUNT * 3
         val blocking = blockingDispatcher(blockingLimit)
 
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt
index bf2e6ba..dd69f32 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt
@@ -13,7 +13,7 @@
 
     private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher()
     private val observedThreads = ConcurrentHashMap<Thread, Long>()
-    private val tasksNum = 4_000_000 * stressTestMultiplier
+    private val tasksNum = 2_000_000 * stressTestMultiplier
     private val processed = AtomicInteger(0)
     private val finishLatch = CountDownLatch(1)
 
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
index a9d6a27..fbbb720 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/SchedulerTestBase.kt
@@ -37,14 +37,6 @@
             require(threads in range) { "Expected threads in $range interval, but has $threads" }
         }
 
-        /**
-         * Asserts that [expectedThreadsCount] of pool worker threads exists at the time of method invocation
-         */
-        fun checkPoolThreadsExist(expectedThreadsCount: Int = CORES_COUNT) {
-            val threads = Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }.count()
-            require(threads == expectedThreadsCount) { "Expected $expectedThreadsCount threads, but has $threads" }
-        }
-
         private fun maxSequenceNumber(): Int? {
             return Thread.getAllStackTraces().keys.filter { it is CoroutineScheduler.Worker }
                 .map { sequenceNumber(it.name) }.max()
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt
index bfc57c7..e54b82b 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt
@@ -11,7 +11,7 @@
 class WorkQueueStressTest : TestBase() {
 
     private val threads = mutableListOf<Thread>()
-    private val offerIterations = 100_000 * stressTestMultiplier
+    private val offerIterations = 100_000 * stressTestMultiplierSqrt // memory pressure, not CPU time
     private val stealersCount = 6
     private val stolenTasks = Array(stealersCount) { Queue() }
     private val globalQueue = Queue() // only producer will use it
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
index ef371e9..216d93a 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt
@@ -126,7 +126,7 @@
     return result
 }
 
-internal fun task(n: Long) = Task(Runnable {}, n, null)
+internal fun task(n: Long) = Task(Runnable {}, n, NonBlockingContext)
 
 internal fun WorkQueue.drain(): List<Long> {
     var task: Task? = poll()