Implemented withTimeoutOrNull function
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index 261b2a4..2e39284 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -31,12 +31,13 @@
Top-level suspending functions:
-| **Name** | **Description**
-| ------------- | ---------------
-| [delay] | Non-blocking sleep
-| [yield] | Yields thread in single-threaded dispatchers
-| [run] | Switches to a different context
-| [withTimeout] | Set execution time-limit (deadline)
+| **Name** | **Description**
+| ------------------- | ---------------
+| [delay] | Non-blocking sleep
+| [yield] | Yields thread in single-threaded dispatchers
+| [run] | Switches to a different context
+| [withTimeout] | Set execution time-limit with exception on timeout
+| [withTimeoutOrNull] | Set execution time-limit will null result on timeout
[Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously:
@@ -93,6 +94,7 @@
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
[run]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run.html
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
+[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
[Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html
[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html
@@ -126,4 +128,5 @@
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive.html
[kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-receive-or-null.html
[kotlinx.coroutines.experimental.selects.SelectBuilder.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-lock.html
+[kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-timeout.html
<!--- END -->
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
index 02fe4d0..2a56b45 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -229,7 +229,7 @@
}
override fun completeResume(token: Any) {
- completeUpdateState(token, state, defaultResumeMode())
+ completeUpdateState(token, state, defaultResumeMode)
}
override fun afterCompletion(state: Any?, mode: Int) {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index ea42def..e976f7e 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -67,9 +67,11 @@
protected open fun createContext() = parentContext + this
- protected open fun defaultResumeMode(): Int = MODE_DISPATCHED
+ protected open val defaultResumeMode: Int get() = MODE_DISPATCHED
- final override fun resume(value: T) = resume(value, defaultResumeMode())
+ protected open val ignoreRepeatedResume: Boolean get() = false
+
+ final override fun resume(value: T) = resume(value, defaultResumeMode)
protected fun resume(value: T, mode: Int) {
while (true) { // lock-free loop on state
@@ -77,12 +79,17 @@
when (state) {
is Incomplete -> if (updateState(state, value, mode)) return
is Cancelled -> return // ignore resumes on cancelled continuation
- else -> throw IllegalStateException("Already resumed, but got value $value")
+ else -> {
+ if (ignoreRepeatedResume) {
+ return
+ } else
+ throw IllegalStateException("Already resumed, but got value $value")
+ }
}
}
}
- final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode())
+ final override fun resumeWithException(exception: Throwable) = resumeWithException(exception, defaultResumeMode)
protected fun resumeWithException(exception: Throwable, mode: Int) {
while (true) { // lock-free loop on state
@@ -96,7 +103,13 @@
if (exception != state.exception) handleCoroutineException(context, exception)
return
}
- else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
+ else -> {
+ if (ignoreRepeatedResume) {
+ handleCoroutineException(context, exception)
+ return
+ } else
+ throw IllegalStateException("Already resumed, but got exception $exception", exception)
+ }
}
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index c7a2d38..8083ce3 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -65,10 +65,15 @@
}
/**
- * Runs a given suspending block of code inside a coroutine with a specified timeout and throws
+ * Runs a given suspending [block] of code inside a coroutine with a specified timeout and throws
* [CancellationException] if timeout was exceeded.
*
- * Note, that timeout can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ * The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
+ * exception inside of it, too. However, even the code in the block suppresses the exception,
+ * this `withTimeout` function invocation still throws [CancellationException].
+ *
+ * The sibling function that does not throw exception on timeout is [withTimeoutOrNull].
+ * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
@@ -81,7 +86,7 @@
if (time <= 0L) throw CancellationException("Timed out immediately")
return suspendCoroutineOrReturn sc@ { delegate: Continuation<T> ->
// schedule cancellation of this continuation on time
- val cont = TimeoutContinuation(time, unit, delegate)
+ val cont = TimeoutExceptionContinuation(time, unit, delegate)
val delay = cont.context[ContinuationInterceptor] as? Delay
if (delay != null)
cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
@@ -93,11 +98,53 @@
}
}
-private class TimeoutContinuation<T>(
+/**
+ * Runs a given suspending block of code inside a coroutine with a specified timeout and returns
+ * `null` if timeout was exceeded.
+ *
+ * The code that is executing inside the [block] is cancelled on timeout and throws [CancellationException]
+ * exception inside of it. However, even the code in the block does not catch the cancellation exception,
+ * this `withTimeoutOrNull` function invocation still returns `null` on timeout.
+ *
+ * The sibling function that throws exception on timeout is [withTimeout].
+ * Note, that timeout action can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
+ * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
+ *
+ * @param time timeout time
+ * @param unit timeout unit (milliseconds by default)
+ */
+public suspend fun <T> withTimeoutOrNull(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T? {
+ require(time >= 0) { "Timeout time $time cannot be negative" }
+ if (time <= 0L) return null
+ return suspendCoroutineOrReturn sc@ { delegate: Continuation<T?> ->
+ // schedule cancellation of this continuation on time
+ val cont = TimeoutNullContinuation<T>(delegate)
+ val delay = cont.context[ContinuationInterceptor] as? Delay
+ if (delay != null)
+ cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, cont)) else
+ cont.cancelFutureOnCompletion(scheduledExecutor.schedule(cont, time, unit))
+ // restart block using cancellable context of this continuation,
+ // however start it as undispatched coroutine, because we are already in the proper context
+ block.startCoroutineUndispatched(cont)
+ cont.getResult()
+ }
+}
+
+private class TimeoutExceptionContinuation<in T>(
private val time: Long,
private val unit: TimeUnit,
delegate: Continuation<T>
) : CancellableContinuationImpl<T>(delegate, active = true), Runnable {
- override fun defaultResumeMode(): Int = MODE_DIRECT
+ override val defaultResumeMode get() = MODE_DIRECT
override fun run() { cancel(CancellationException("Timed out waiting for $time $unit")) }
}
+
+private class TimeoutNullContinuation<in T>(
+ delegate: Continuation<T?>
+) : CancellableContinuationImpl<T?>(delegate, active = true), Runnable {
+ override val defaultResumeMode get() = MODE_DIRECT
+ override val ignoreRepeatedResume: Boolean get() = true
+ override fun run() { resume(null, mode = 0) /* dispatch resume */ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index e1e58b6..fc75e17 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -216,7 +216,7 @@
cancel(cause)
}
- override fun defaultResumeMode(): Int = MODE_DIRECT // all resumes through completion are dispatched directly
+ override val defaultResumeMode get() = MODE_DIRECT // all resumes through completion are dispatched directly
override val completion: Continuation<R> get() {
check(isSelected) { "Must be selected first" }
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt
new file mode 100644
index 0000000..453fd67
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullTest.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsNull
+import org.junit.Assert.assertThat
+import org.junit.Test
+
+class WithTimeoutOrNullTest : TestBase() {
+ /**
+ * Tests property dispatching of `withTimeoutOrNull` blocks
+ */
+ @Test
+ fun testDispatch() = runBlocking {
+ expect(1)
+ launch(context) {
+ expect(4)
+ yield() // back to main
+ expect(7)
+ }
+ expect(2)
+ // test that it does not yield to the above job when started
+ val result = withTimeoutOrNull(1000) {
+ expect(3)
+ yield() // yield only now
+ expect(5)
+ "OK"
+ }
+ assertThat(result, IsEqual("OK"))
+ expect(6)
+ yield() // back to launch
+ finish(8)
+ }
+
+ /**
+ * Tests that a 100% CPU-consuming loop will react on timeout if it has yields.
+ */
+ @Test
+ fun testYieldBlockingWithTimeout() = runBlocking {
+ expect(1)
+ val result = withTimeoutOrNull(100) {
+ while (true) {
+ yield()
+ }
+ }
+ assertThat(result, IsNull())
+ finish(2)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullThreadDispatchTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullThreadDispatchTest.kt
new file mode 100644
index 0000000..d6d726f
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutOrNullThreadDispatchTest.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.hamcrest.core.IsEqual
+import org.hamcrest.core.IsNull
+import org.junit.After
+import org.junit.Assert
+import org.junit.Assert.assertThat
+import org.junit.Test
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.ThreadFactory
+import kotlin.coroutines.experimental.CoroutineContext
+
+class WithTimeoutOrNullThreadDispatchTest : TestBase() {
+ var executor: ExecutorService? = null
+
+ @After
+ fun tearDown() {
+ executor?.shutdown()
+ }
+
+ @Test
+ fun testCancellationDispatchScheduled() {
+ checkCancellationDispatch {
+ executor = Executors.newScheduledThreadPool(1, it)
+ executor!!.asCoroutineDispatcher()
+ }
+ }
+
+ @Test
+ fun testCancellationDispatchNonScheduled() {
+ checkCancellationDispatch {
+ executor = Executors.newSingleThreadExecutor(it)
+ executor!!.asCoroutineDispatcher()
+ }
+ }
+
+ @Test
+ fun testCancellationDispatchCustomNoDelay() {
+ checkCancellationDispatch {
+ executor = Executors.newSingleThreadExecutor(it)
+ object : CoroutineDispatcher() {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ executor!!.execute(block)
+ }
+ }
+ }
+ }
+
+ private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
+ expect(1)
+ var thread: Thread? = null
+ val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
+ run(dispatcher) {
+ expect(2)
+ Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+ val result =
+ withTimeoutOrNull(100) {
+ try {
+ expect(3)
+ delay(1000)
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(4)
+ Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+ }
+ expect(5)
+ "FAIL"
+ }
+ assertThat(result, IsNull())
+ expect(6)
+ }
+ finish(7)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
index fde334b..3ae3930 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutTest.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental
+import org.hamcrest.core.IsEqual
+import org.junit.Assert.assertThat
import org.junit.Test
class WithTimeoutTest : TestBase() {
@@ -32,11 +34,13 @@
}
expect(2)
// test that it does not yield to the above job when started
- withTimeout(1000) {
+ val result = withTimeout(1000) {
expect(3)
yield() // yield only now
expect(5)
+ "OK"
}
+ assertThat(result, IsEqual("OK"))
expect(6)
yield() // back to launch
finish(8)