Renamed ScheduledExecutor to DefaultExecutor;
consistently schedule coroutines to it on any failure in all other
executors (on RejectedExecutionException, etc)
diff --git a/coroutines-guide.md b/coroutines-guide.md
index b2be612..abbd614 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -816,14 +816,14 @@
```text
'Unconfined': I'm working in thread main
'context': I'm working in thread main
- 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor
+ 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'context': After delay in thread main
```
<!--- TEST LINES_START -->
So, the coroutine that had inherited `context` of `runBlocking {...}` continues to execute in the `main` thread,
-while the unconfined one had resumed in the scheduler thread that [delay] function is using.
+while the unconfined one had resumed in the default executor thread that [delay] function is using.
### Debugging coroutines and threads
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
index 6198492..56e6b27 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
@@ -18,6 +18,7 @@
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
+import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.experimental.CoroutineContext
@@ -62,7 +63,8 @@
_pool ?: createPool().also { _pool = it }
override fun dispatch(context: CoroutineContext, block: Runnable) =
- (_pool ?: getOrCreatePoolSync()).execute(block)
+ try { (_pool ?: getOrCreatePoolSync()).execute(block) }
+ catch (e: RejectedExecutionException) { defaultExecutor.execute(block) }
// used for tests
@Synchronized
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt
new file mode 100644
index 0000000..97a6482
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.ScheduledThreadPoolExecutor
+import java.util.concurrent.TimeUnit
+
+private const val DEFAULT_KEEP_ALIVE = 1000L
+
+private val KEEP_ALIVE =
+ try { java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) }
+ catch (e: SecurityException) { DEFAULT_KEEP_ALIVE }
+
+@Volatile
+private var _executor: ScheduledExecutorService? = null
+
+internal val defaultExecutor: ScheduledExecutorService
+ get() = _executor ?: getOrCreateExecutorSync()
+
+@Synchronized
+private fun getOrCreateExecutorSync(): ScheduledExecutorService =
+ _executor ?: ScheduledThreadPoolExecutor(1) { r ->
+ Thread(r, "kotlinx.coroutines.DefaultExecutor").apply { isDaemon = true }
+ }.apply {
+ setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
+ allowCoreThreadTimeOut(true)
+ executeExistingDelayedTasksAfterShutdownPolicy = false
+ // "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection
+ try {
+ val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType)
+ m.invoke(this, true)
+ } catch (ex: Throwable) { /* ignore */ }
+ _executor = this
+ }
+
+// used for tests
+@Synchronized
+internal fun defaultExecutorShutdownNow() {
+ _executor?.shutdownNow()
+}
+
+@Synchronized
+internal fun defaultExecutorShutdownNowAndRelease() {
+ _executor?.apply {
+ shutdownNow()
+ _executor = null
+ }
+}
+
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index c853a9f..b5d414a 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -67,7 +67,7 @@
* This implementation uses a built-in single-threaded scheduled executor service.
*/
fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
- DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
+ DisposableFutureHandle(defaultExecutor.schedule(block, time, unit))
}
/**
@@ -88,7 +88,7 @@
val delay = cont.context[ContinuationInterceptor] as? Delay
if (delay != null)
delay.scheduleResumeAfterDelay(time, unit, cont) else
- cont.cancelFutureOnCompletion(scheduledExecutor.schedule(ResumeRunnable(cont), time, unit))
+ cont.cancelFutureOnCompletion(defaultExecutor.schedule(ResumeRunnable(cont), time, unit))
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 2213877..7be7633 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -83,7 +83,8 @@
// todo: we should unpark only when this task became first in the queue
unpark()
} else {
- block.run() // otherwise run it right here (as if Unconfined)
+ // otherwise submit to a default executor
+ defaultExecutor.execute(block)
}
}
@@ -92,7 +93,8 @@
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
- scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit) // otherwise reschedule to other time pool
+ // otherwise schedule to a default executor
+ defaultExecutor.schedule(ResumeRunnable(continuation), time, unit)
}
}
@@ -103,7 +105,8 @@
unpark()
return delayedTask
}
- return DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
+ // otherwise schedule to a default executor
+ return DisposableFutureHandle(defaultExecutor.schedule(block, time, unit))
}
override fun processNextEvent(): Long {
@@ -129,19 +132,14 @@
fun shutdown() {
assert(!isActive)
+ assert(Thread.currentThread() === thread)
// complete processing of all queued tasks
- while (true) {
- val queuedTask = (queue.removeFirstOrNull() ?: break) as QueuedTask
- queuedTask.run()
- }
- // reschedule or execute delayed tasks
+ while (processNextEvent() <= 0) { /* spin */ }
+ // reschedule the rest of delayed tasks
+ val now = System.nanoTime()
while (true) {
val delayedTask = delayed.removeFirst() ?: break
- val now = System.nanoTime()
- if (delayedTask.timeToExecute(now))
- delayedTask.run()
- else
- delayedTask.rescheduleOnShutdown(now)
+ delayedTask.rescheduleOnShutdown(now)
}
}
@@ -196,7 +194,11 @@
fun rescheduleOnShutdown(now: Long) = synchronized(delayed) {
if (delayed.remove(this)) {
assert (scheduledAfterShutdown == null)
- scheduledAfterShutdown = scheduledExecutor.schedule(this, nanoTime - now, TimeUnit.NANOSECONDS)
+ val remaining = nanoTime - now
+ scheduledAfterShutdown =
+ if (remaining > 0)
+ defaultExecutor.schedule(this, remaining, TimeUnit.NANOSECONDS)
+ else defaultExecutor.submit(this)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
index caafc08..1f78c8f 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -17,6 +17,7 @@
package kotlinx.coroutines.experimental
import java.util.concurrent.Executor
+import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
@@ -42,19 +43,25 @@
internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
abstract val executor: Executor
- override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
+ override fun dispatch(context: CoroutineContext, block: Runnable) =
+ try { executor.execute(block) }
+ catch (e: RejectedExecutionException) { defaultExecutor.execute(block) }
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
- val timeout = (executor as? ScheduledExecutorService)
- ?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
- ?: scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
+ val timeout =
+ try { (executor as? ScheduledExecutorService)
+ ?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
+ catch (e: RejectedExecutionException) { null }
+ ?: defaultExecutor.schedule(ResumeRunnable(continuation), time, unit)
continuation.cancelFutureOnCompletion(timeout)
}
override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
- val timeout = (executor as? ScheduledExecutorService)
- ?.schedule(block, time, unit)
- ?: scheduledExecutor.schedule(block, time, unit)
+ val timeout =
+ try { (executor as? ScheduledExecutorService)
+ ?.schedule(block, time, unit) }
+ catch (e: RejectedExecutionException) { null }
+ ?: defaultExecutor.schedule(block, time, unit)
return DisposableFutureHandle(timeout)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index 54b29dc..ac37185 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -18,8 +18,6 @@
import kotlinx.coroutines.experimental.selects.SelectBuilder
import kotlinx.coroutines.experimental.selects.select
-import java.util.concurrent.ScheduledExecutorService
-import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -27,44 +25,6 @@
import kotlin.coroutines.experimental.intrinsics.startCoroutineUninterceptedOrReturn
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
-private val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
-
-@Volatile
-private var _scheduledExecutor: ScheduledExecutorService? = null
-
-internal val scheduledExecutor: ScheduledExecutorService get() =
- _scheduledExecutor ?: getOrCreateScheduledExecutorSync()
-
-@Synchronized
-private fun getOrCreateScheduledExecutorSync(): ScheduledExecutorService =
- _scheduledExecutor ?: ScheduledThreadPoolExecutor(1) { r ->
- Thread(r, "kotlinx.coroutines.ScheduledExecutor").apply { isDaemon = true }
- }.apply {
- setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
- allowCoreThreadTimeOut(true)
- executeExistingDelayedTasksAfterShutdownPolicy = false
- // "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection
- try {
- val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType)
- m.invoke(this, true)
- } catch (ex: Throwable) { /* ignore */ }
- _scheduledExecutor = this
- }
-
-// used for tests
-@Synchronized
-internal fun scheduledExecutorShutdownNow() {
- _scheduledExecutor?.shutdownNow()
-}
-
-@Synchronized
-internal fun scheduledExecutorShutdownNowAndRelease() {
- _scheduledExecutor?.apply {
- shutdownNow()
- _scheduledExecutor = null
- }
-}
-
/**
* Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
* [CancellationException] if timeout was exceeded.
@@ -93,7 +53,7 @@
// schedule cancellation of this coroutine on time
if (delay != null)
completion.disposeOnCompletion(delay.invokeOnTimeout(time, unit, completion)) else
- completion.cancelFutureOnCompletion(scheduledExecutor.schedule(completion, time, unit))
+ completion.cancelFutureOnCompletion(defaultExecutor.schedule(completion, time, unit))
completion.initParentJob(context[Job])
// restart block using new coroutine with new job,
// however start it as undispatched coroutine, because we are already in the proper context
@@ -141,7 +101,7 @@
// schedule cancellation of this coroutine on time
if (delay != null)
completion.disposeOnCompletion(delay.invokeOnTimeout(time, unit, completion)) else
- completion.cancelFutureOnCompletion(scheduledExecutor.schedule(completion, time, unit))
+ completion.cancelFutureOnCompletion(defaultExecutor.schedule(completion, time, unit))
completion.initParentJob(context[Job])
// restart block using new coroutine with new job,
// however start it as undispatched coroutine, because we are already in the proper context
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 232bf51..f9b4721 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -440,7 +440,7 @@
val delay = context[ContinuationInterceptor] as? Delay
if (delay != null)
disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
- disposeOnSelect(DisposableFutureHandle(scheduledExecutor.schedule(action, time, unit)))
+ disposeOnSelect(DisposableFutureHandle(defaultExecutor.schedule(action, time, unit)))
}
private class DisposeNode(
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
index 9655454..88e3e33 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
@@ -171,7 +171,7 @@
test { guide.context.example02.main(emptyArray()) }.verifyLinesStart(
" 'Unconfined': I'm working in thread main",
" 'context': I'm working in thread main",
- " 'Unconfined': After delay in thread kotlinx.coroutines.ScheduledExecutor",
+ " 'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor",
" 'context': After delay in thread main"
)
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt b/kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt
index 4cead9e..56b6464 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/guide/test/TestUtil.kt
@@ -42,10 +42,10 @@
// capture output
bytes = bytesOut.toByteArray()
// the shutdown
- scheduledExecutorShutdownNow()
+ defaultExecutorShutdownNow()
shutdownDispatcherPools()
CommonPool.shutdownAndRelease(10000L) // wait at most 10 sec
- scheduledExecutorShutdownNowAndRelease()
+ defaultExecutorShutdownNowAndRelease()
System.setOut(oldOut)
System.setErr(oldErr)