Implement yield() for experimental scheduler
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 9b98ba4..ccea8a3 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -78,6 +78,16 @@
     public abstract fun dispatch(context: CoroutineContext, block: Runnable)
 
     /**
+     * Dispatches execution of a runnable [block] onto another thread in the given [context]
+     * with a hint for dispatcher that current dispatch is triggered by [yield] call, so execution of this
+     * continuation may be delayed in favor of already dispatched coroutines.
+     *
+     * **Implementation note** though yield marker may be passed as a part of [context], this
+     * is a separate method for performance reasons
+     */
+    public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
+
+    /**
      * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
      */
     public override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
index 5bd1668..3ffc47a 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
@@ -101,7 +101,7 @@
         val context = continuation.context
         _state = value
         resumeMode = MODE_CANCELLABLE
-        dispatcher.dispatch(context, this)
+        dispatcher.dispatchYield(context, this)
     }
 
     override fun toString(): String =
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 582d4d6..653d19e 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -267,9 +267,9 @@
     private fun tryUnpark(): Boolean {
         while (true) {
             val worker = parkedWorkersStack.pop() ?: return false
-            if (!worker.registeredInParkedQueue.value) {
+            if (!worker.registeredInStack.value) {
                 continue // Someone else succeeded
-            } else if (!worker.registeredInParkedQueue.compareAndSet(true, false)) {
+            } else if (!worker.registeredInStack.compareAndSet(true, false)) {
                 continue // Someone else succeeded
             }
 
@@ -434,7 +434,8 @@
                 "retired workers = $retired, " +
                 "finished workers = $finished, " +
                 "running workers queues = $queueSizes, "+
-                "global queue size = ${globalWorkQueue.size()}]"
+                "global queue size = ${globalWorkQueue.size()}], " +
+                "control state: ${controlState.value}"
     }
 
     // todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
@@ -473,7 +474,7 @@
          * Worker registers itself in this queue once and will stay there until
          * someone will call [Queue.poll] which return it, then this flag is reset.
          */
-        val registeredInParkedQueue = atomic(false)
+        val registeredInStack = atomic(false)
         var nextParkedWorker: PoolWorker? = null
 
         /**
@@ -624,7 +625,7 @@
                         parkTimeNs = (parkTimeNs * 3 shr 1).coerceAtMost(MAX_PARK_TIME_NS)
                     }
 
-                    if (registeredInParkedQueue.compareAndSet(false, true)) {
+                    if (registeredInStack.compareAndSet(false, true)) {
                         parkedWorkersStack.push(this)
                     }
 
@@ -636,7 +637,7 @@
 
         private fun blockingWorkerIdle() {
             tryReleaseCpu(WorkerState.PARKING)
-            if (registeredInParkedQueue.compareAndSet(false, true)) {
+            if (registeredInStack.compareAndSet(false, true)) {
                 parkedWorkersStack.push(this)
             }
 
@@ -671,7 +672,7 @@
                  * Either thread successfully deregisters itself from queue (and then terminate) or someone else
                  * tried to unpark it. In the latter case we should proceed as unparked worker
                  */
-                if (registeredInParkedQueue.value && !registeredInParkedQueue.compareAndSet(true, false)) {
+                if (registeredInStack.value && !registeredInStack.compareAndSet(true, false)) {
                     return
                 }
 
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
index 0cad8c6..d0a1b9a 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -6,15 +6,15 @@
 import java.util.concurrent.*
 import kotlin.coroutines.experimental.*
 
+// TODO make internal after integration wih Ktor
 class ExperimentalCoroutineDispatcher(corePoolSize: Int = Runtime.getRuntime().availableProcessors(), maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
 
     private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
 
-    /**
-     * TODO: yield doesn't work as expected
-     */
     override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
 
+    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
+
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
             DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
 
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
index d068462..7423bf8 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
@@ -12,10 +12,10 @@
         "kotlinx.coroutines.scheduler.offload.threshold", 96L)
 
 internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
-        "kotlinx.coroutines.scheduler.blocking.parallelism", 16L).toInt()
+        "kotlinx.coroutines.scheduler.blocking.parallelism", 16)
 
 internal val MAX_POOL_SIZE = readFromSystemProperties(
-    "kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128L).toInt()
+    "kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128)
 
 internal var schedulerTimeSource: TimeSource = NanoTimeSource
 
@@ -36,7 +36,9 @@
     override fun nanoTime() = System.nanoTime()
 }
 
-private fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
+internal fun readFromSystemProperties(propertyName: String, defaultValue: Int): Int = readFromSystemProperties(propertyName, defaultValue.toLong()).toInt()
+
+internal fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
     val value = try {
         System.getProperty(propertyName)
     } catch (e: SecurityException) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
index a216972..265d6e9 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
@@ -27,10 +27,19 @@
  */
 internal class WorkQueue {
 
-    // todo: There is non-atomicity in computing bufferSize (indices update separately).
-    // todo: It can lead to arbitrary values of resulting bufferSize.
-    // todo: Consider merging both indices into a single Long.
-    // todo: Alternatively, prove that sporadic arbitrary result here is Ok (does not seems the case now)
+    /*
+     * We read two independent counter here.
+     * Producer index is incremented only by owner
+     * Consumer index is incremented both by owner and external threads
+     *
+     * The only harmful race is:
+     * [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5)
+     * [T2] changeProducerIndex (3)
+     * [T3] changeConsumerIndex (4)
+     *
+     * Which can lead to resulting size bigger than actual size at any moment of time.
+     * This is in general harmless because steal will be blocked by timer
+     */
     internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
 
     // TODO replace with inlined array when atomicfu will support it
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
index aa97caa..94fdf09 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
@@ -2,7 +2,6 @@
 
 import kotlinx.coroutines.experimental.*
 import org.junit.*
-import org.junit.Ignore
 import org.junit.Test
 import java.util.concurrent.atomic.*
 import kotlin.coroutines.experimental.*
@@ -143,8 +142,7 @@
         checkPoolThreadsCreated(4)
     }
 
-    @Test(timeout = 1_000) // Failing test until yield() is not fixed
-    @Ignore
+    @Test(timeout = 1_000)
     fun testYield() = runBlocking {
         corePoolSize = 1
         maxPoolSize = 1