Select expression onTimeout clause
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index efc003b..261b2a4 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -48,6 +48,7 @@
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.selects.SelectBuilder.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
+| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none
Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
helper function. [NonCancellable] job object is provided to suppress cancellation with
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index e6daa03..2c984cf 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -16,6 +16,8 @@
package kotlinx.coroutines.experimental
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.select
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -74,6 +76,8 @@
* If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
* immediately resumes with [CancellationException].
*
+ * Note, that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
*/
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index 200f9d2..c7a2d38 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -17,6 +17,8 @@
package kotlinx.coroutines.experimental
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.select
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
@@ -66,8 +68,13 @@
* Runs a given suspending block of code inside a coroutine with a specified timeout and throws
* [CancellationException] if timeout was exceeded.
*
+ * Note, that timeout can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
* This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
* implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
+ *
+ * @param time timeout time
+ * @param unit timeout unit (milliseconds by default)
*/
public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
require(time >= 0) { "Timeout time $time cannot be negative" }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 815d7d9..e1e58b6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -22,11 +22,14 @@
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.internal.AtomicDesc
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.sync.Mutex
+import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+import kotlin.coroutines.experimental.startCoroutine
/**
* Scope for [select] invocation.
@@ -76,6 +79,14 @@
* is already locked with the same token (same identity), this clause throws [IllegalStateException].
*/
public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
+
+ /**
+ * Clause that selects the given [block] after a specified timeout passes.
+ *
+ * @param time timeout time
+ * @param unit timeout unit (milliseconds by default)
+ */
+ public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
}
/**
@@ -145,6 +156,7 @@
* | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][SelectBuilder.onReceive] | [poll][ReceiveChannel.poll]
* | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
* | [Mutex] | [lock][Mutex.lock] | [onLock][SelectBuilder.onLock] | [tryLock][Mutex.tryLock]
+ * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
@@ -240,6 +252,25 @@
registerSelectLock(this@SelectBuilderImpl, owner, block)
}
+ override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
+ require(time >= 0) { "Timeout time $time cannot be negative" }
+ if (time == 0L) {
+ if (trySelect(idempotent = null))
+ block.startCoroutineUndispatched(completion)
+ return
+ }
+ val action = Runnable {
+ // todo: we could have replaced startCoroutine with startCoroutineUndispatched
+ // But we need a way to know that Delay.invokeOnTimeout had used the right thread
+ if (trySelect(idempotent = null))
+ block.startCoroutine(completion)
+ }
+ val delay = context[ContinuationInterceptor] as? Delay
+ if (delay != null)
+ disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
+ cancelFutureOnCompletion(scheduledExecutor.schedule(action, time, unit))
+ }
+
override fun disposeOnSelect(handle: DisposableHandle) {
invokeOnCompletion(DisposeOnCompletion(this, handle))
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
index 447a4c0..a08f976 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
@@ -22,6 +22,7 @@
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.sync.Mutex
import java.util.*
+import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
@@ -91,4 +92,8 @@
override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
clauses += { registerSelectLock(instance, owner, block) }
}
+
+ override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
+ clauses += { instance.onTimeout(time, unit, block) }
+ }
}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt
new file mode 100644
index 0000000..f1bfc78
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.hamcrest.core.IsEqual
+import org.junit.After
+import org.junit.Assert
+import org.junit.Test
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.ThreadFactory
+import kotlin.coroutines.experimental.CoroutineContext
+
+class WithTimeoutThreadDispatchTest : TestBase() {
+ var executor: ExecutorService? = null
+
+ @After
+ fun tearDown() {
+ executor?.shutdown()
+ }
+
+ @Test
+ fun testCancellationDispatchScheduled() {
+ checkCancellationDispatch {
+ executor = Executors.newScheduledThreadPool(1, it)
+ executor!!.asCoroutineDispatcher()
+ }
+ }
+
+ @Test
+ fun testCancellationDispatchNonScheduled() {
+ checkCancellationDispatch {
+ executor = Executors.newSingleThreadExecutor(it)
+ executor!!.asCoroutineDispatcher()
+ }
+ }
+
+ @Test
+ fun testCancellationDispatchCustomNoDelay() {
+ checkCancellationDispatch {
+ executor = Executors.newSingleThreadExecutor(it)
+ object : CoroutineDispatcher() {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ executor!!.execute(block)
+ }
+ }
+ }
+ }
+
+ private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
+ expect(1)
+ var thread: Thread? = null
+ val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
+ run(dispatcher) {
+ expect(2)
+ Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+ try {
+ withTimeout(100) {
+ try {
+ expect(3)
+ delay(1000)
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(4)
+ Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+ }
+ expect(5)
+ }
+ } catch (e: CancellationException) {
+ expect(6)
+ Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+ }
+ expect(7)
+ }
+ finish(8)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt
new file mode 100644
index 0000000..b8842c1
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.runBlocking
+import org.hamcrest.MatcherAssert.assertThat
+import org.hamcrest.core.IsEqual
+import org.junit.Test
+
+class SelectTimeoutTest : TestBase() {
+ @Test
+ fun testBasic() = runBlocking {
+ expect(1)
+ val result = select<String> {
+ onTimeout(1000) {
+ expectUnreached()
+ "FAIL"
+ }
+ onTimeout(100) {
+ expect(2)
+ "OK"
+ }
+ onTimeout(500) {
+ expectUnreached()
+ "FAIL"
+ }
+ }
+ assertThat(result, IsEqual("OK"))
+ finish(3)
+ }
+}
\ No newline at end of file