Implement yield() for experimental scheduler
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 9b98ba4..ccea8a3 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -78,6 +78,16 @@
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
/**
+ * Dispatches execution of a runnable [block] onto another thread in the given [context]
+ * with a hint for dispatcher that current dispatch is triggered by [yield] call, so execution of this
+ * continuation may be delayed in favor of already dispatched coroutines.
+ *
+ * **Implementation note** though yield marker may be passed as a part of [context], this
+ * is a separate method for performance reasons
+ */
+ public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
+
+ /**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
*/
public override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
index 5bd1668..3ffc47a 100644
--- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Dispatched.kt
@@ -101,7 +101,7 @@
val context = continuation.context
_state = value
resumeMode = MODE_CANCELLABLE
- dispatcher.dispatch(context, this)
+ dispatcher.dispatchYield(context, this)
}
override fun toString(): String =
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
index 582d4d6..653d19e 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt
@@ -267,9 +267,9 @@
private fun tryUnpark(): Boolean {
while (true) {
val worker = parkedWorkersStack.pop() ?: return false
- if (!worker.registeredInParkedQueue.value) {
+ if (!worker.registeredInStack.value) {
continue // Someone else succeeded
- } else if (!worker.registeredInParkedQueue.compareAndSet(true, false)) {
+ } else if (!worker.registeredInStack.compareAndSet(true, false)) {
continue // Someone else succeeded
}
@@ -434,7 +434,8 @@
"retired workers = $retired, " +
"finished workers = $finished, " +
"running workers queues = $queueSizes, "+
- "global queue size = ${globalWorkQueue.size()}]"
+ "global queue size = ${globalWorkQueue.size()}], " +
+ "control state: ${controlState.value}"
}
// todo: make name of the pool configurable (optional parameter to CoroutineScheduler) and base thread names on it
@@ -473,7 +474,7 @@
* Worker registers itself in this queue once and will stay there until
* someone will call [Queue.poll] which return it, then this flag is reset.
*/
- val registeredInParkedQueue = atomic(false)
+ val registeredInStack = atomic(false)
var nextParkedWorker: PoolWorker? = null
/**
@@ -624,7 +625,7 @@
parkTimeNs = (parkTimeNs * 3 shr 1).coerceAtMost(MAX_PARK_TIME_NS)
}
- if (registeredInParkedQueue.compareAndSet(false, true)) {
+ if (registeredInStack.compareAndSet(false, true)) {
parkedWorkersStack.push(this)
}
@@ -636,7 +637,7 @@
private fun blockingWorkerIdle() {
tryReleaseCpu(WorkerState.PARKING)
- if (registeredInParkedQueue.compareAndSet(false, true)) {
+ if (registeredInStack.compareAndSet(false, true)) {
parkedWorkersStack.push(this)
}
@@ -671,7 +672,7 @@
* Either thread successfully deregisters itself from queue (and then terminate) or someone else
* tried to unpark it. In the latter case we should proceed as unparked worker
*/
- if (registeredInParkedQueue.value && !registeredInParkedQueue.compareAndSet(true, false)) {
+ if (registeredInStack.value && !registeredInStack.compareAndSet(true, false)) {
return
}
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
index 0cad8c6..d0a1b9a 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt
@@ -6,15 +6,15 @@
import java.util.concurrent.*
import kotlin.coroutines.experimental.*
+// TODO make internal after integration wih Ktor
class ExperimentalCoroutineDispatcher(corePoolSize: Int = Runtime.getRuntime().availableProcessors(), maxPoolSize: Int = MAX_POOL_SIZE) : CoroutineDispatcher(), Delay, Closeable {
private val coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize)
- /**
- * TODO: yield doesn't work as expected
- */
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
+ override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block, fair = true)
+
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>): Unit =
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
index d068462..7423bf8 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt
@@ -12,10 +12,10 @@
"kotlinx.coroutines.scheduler.offload.threshold", 96L)
internal val BLOCKING_DEFAULT_PARALLELISM = readFromSystemProperties(
- "kotlinx.coroutines.scheduler.blocking.parallelism", 16L).toInt()
+ "kotlinx.coroutines.scheduler.blocking.parallelism", 16)
internal val MAX_POOL_SIZE = readFromSystemProperties(
- "kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128L).toInt()
+ "kotlinx.coroutines.scheduler.max.pool.size", Runtime.getRuntime().availableProcessors() * 128)
internal var schedulerTimeSource: TimeSource = NanoTimeSource
@@ -36,7 +36,9 @@
override fun nanoTime() = System.nanoTime()
}
-private fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
+internal fun readFromSystemProperties(propertyName: String, defaultValue: Int): Int = readFromSystemProperties(propertyName, defaultValue.toLong()).toInt()
+
+internal fun readFromSystemProperties(propertyName: String, defaultValue: Long): Long {
val value = try {
System.getProperty(propertyName)
} catch (e: SecurityException) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
index a216972..265d6e9 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt
@@ -27,10 +27,19 @@
*/
internal class WorkQueue {
- // todo: There is non-atomicity in computing bufferSize (indices update separately).
- // todo: It can lead to arbitrary values of resulting bufferSize.
- // todo: Consider merging both indices into a single Long.
- // todo: Alternatively, prove that sporadic arbitrary result here is Ok (does not seems the case now)
+ /*
+ * We read two independent counter here.
+ * Producer index is incremented only by owner
+ * Consumer index is incremented both by owner and external threads
+ *
+ * The only harmful race is:
+ * [T1] readProducerIndex (1) preemption(2) readConsumerIndex(5)
+ * [T2] changeProducerIndex (3)
+ * [T3] changeConsumerIndex (4)
+ *
+ * Which can lead to resulting size bigger than actual size at any moment of time.
+ * This is in general harmless because steal will be blocked by timer
+ */
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value
// TODO replace with inlined array when atomicfu will support it
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
index aa97caa..94fdf09 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineDispatcherTest.kt
@@ -2,7 +2,6 @@
import kotlinx.coroutines.experimental.*
import org.junit.*
-import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.coroutines.experimental.*
@@ -143,8 +142,7 @@
checkPoolThreadsCreated(4)
}
- @Test(timeout = 1_000) // Failing test until yield() is not fixed
- @Ignore
+ @Test(timeout = 1_000)
fun testYield() = runBlocking {
corePoolSize = 1
maxPoolSize = 1