Select expression onTimeout clause
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index efc003b..261b2a4 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -48,6 +48,7 @@
 | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive]             | [onReceive][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceive]             | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
 | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.selects.SelectBuilder.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]
 | [Mutex][kotlinx.coroutines.experimental.sync.Mutex]          | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock]                            | [onLock][kotlinx.coroutines.experimental.selects.SelectBuilder.onLock]                   | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock]
+| none            | [delay]                                        | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout]                   | none
 
 Cancellation support for user-defined suspending functions is available with [suspendCancellableCoroutine]
 helper function. [NonCancellable] job object is provided to suppress cancellation with 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index e6daa03..2c984cf 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -16,6 +16,8 @@
 
 package kotlinx.coroutines.experimental
 
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.select
 import java.util.concurrent.Future
 import java.util.concurrent.TimeUnit
 import kotlin.coroutines.experimental.ContinuationInterceptor
@@ -74,6 +76,8 @@
  * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
  * immediately resumes with [CancellationException].
  *
+ * Note, that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
  * This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
  * implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
  */
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index 200f9d2..c7a2d38 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -17,6 +17,8 @@
 package kotlinx.coroutines.experimental
 
 import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
+import kotlinx.coroutines.experimental.selects.SelectBuilder
+import kotlinx.coroutines.experimental.selects.select
 import java.util.concurrent.ScheduledExecutorService
 import java.util.concurrent.ScheduledThreadPoolExecutor
 import java.util.concurrent.TimeUnit
@@ -66,8 +68,13 @@
  * Runs a given suspending block of code inside a coroutine with a specified timeout and throws
  * [CancellationException] if timeout was exceeded.
  *
+ * Note, that timeout can be specified for [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
+ *
  * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
  * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
+ *
+ * @param time timeout time
+ * @param unit timeout unit (milliseconds by default)
  */
 public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
     require(time >= 0) { "Timeout time $time cannot be negative" }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 815d7d9..e1e58b6 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -22,11 +22,14 @@
 import kotlinx.coroutines.experimental.channels.ReceiveChannel
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.internal.AtomicDesc
-import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
 import kotlinx.coroutines.experimental.sync.Mutex
+import java.util.concurrent.TimeUnit
 import kotlin.coroutines.experimental.Continuation
+import kotlin.coroutines.experimental.ContinuationInterceptor
 import kotlin.coroutines.experimental.CoroutineContext
 import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
+import kotlin.coroutines.experimental.startCoroutine
 
 /**
  * Scope for [select] invocation.
@@ -76,6 +79,14 @@
      *        is already locked with the same token (same identity), this clause throws [IllegalStateException].
      */
     public fun Mutex.onLock(owner: Any? = null, block: suspend () -> R)
+
+    /**
+     * Clause that selects the given [block] after a specified timeout passes.
+     *
+     * @param time timeout time
+     * @param unit timeout unit (milliseconds by default)
+     */
+    public fun onTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> R)
 }
 
 /**
@@ -145,6 +156,7 @@
  * | [ReceiveChannel] | [receive][ReceiveChannel.receive]             | [onReceive][SelectBuilder.onReceive]             | [poll][ReceiveChannel.poll]
  * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][SelectBuilder.onReceiveOrNull] | [poll][ReceiveChannel.poll]
  * | [Mutex]          | [lock][Mutex.lock]                            | [onLock][SelectBuilder.onLock]                   | [tryLock][Mutex.tryLock]
+ * | none             | [delay]                                       | [onTimeout][SelectBuilder.onTimeout]             | none
  *
  * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
  * function is suspended, this function immediately resumes with [CancellationException].
@@ -240,6 +252,25 @@
         registerSelectLock(this@SelectBuilderImpl, owner, block)
     }
 
+    override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
+        require(time >= 0) { "Timeout time $time cannot be negative" }
+        if (time == 0L) {
+            if (trySelect(idempotent = null))
+                block.startCoroutineUndispatched(completion)
+            return
+        }
+        val action = Runnable {
+            // todo: we could have replaced startCoroutine with startCoroutineUndispatched
+            // But we need a way to know that Delay.invokeOnTimeout had used the right thread
+            if (trySelect(idempotent = null))
+                block.startCoroutine(completion)
+        }
+        val delay = context[ContinuationInterceptor] as? Delay
+        if (delay != null)
+            disposeOnSelect(delay.invokeOnTimeout(time, unit, action)) else
+            cancelFutureOnCompletion(scheduledExecutor.schedule(action, time, unit))
+    }
+
     override fun disposeOnSelect(handle: DisposableHandle) {
         invokeOnCompletion(DisposeOnCompletion(this, handle))
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
index 447a4c0..a08f976 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/SelectUnbiased.kt
@@ -22,6 +22,7 @@
 import kotlinx.coroutines.experimental.channels.SendChannel
 import kotlinx.coroutines.experimental.sync.Mutex
 import java.util.*
+import java.util.concurrent.TimeUnit
 import kotlin.coroutines.experimental.Continuation
 import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
 
@@ -91,4 +92,8 @@
     override fun Mutex.onLock(owner: Any?, block: suspend () -> R) {
         clauses += { registerSelectLock(instance, owner, block) }
     }
+
+    override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
+        clauses += { instance.onTimeout(time, unit, block) }
+    }
 }
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt
new file mode 100644
index 0000000..f1bfc78
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/WithTimeoutThreadDispatchTest.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.hamcrest.core.IsEqual
+import org.junit.After
+import org.junit.Assert
+import org.junit.Test
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.ThreadFactory
+import kotlin.coroutines.experimental.CoroutineContext
+
+class WithTimeoutThreadDispatchTest : TestBase() {
+    var executor: ExecutorService? = null
+
+    @After
+    fun tearDown() {
+        executor?.shutdown()
+    }
+
+    @Test
+    fun testCancellationDispatchScheduled() {
+        checkCancellationDispatch {
+            executor = Executors.newScheduledThreadPool(1, it)
+            executor!!.asCoroutineDispatcher()
+        }
+    }
+
+    @Test
+    fun testCancellationDispatchNonScheduled() {
+        checkCancellationDispatch {
+            executor = Executors.newSingleThreadExecutor(it)
+            executor!!.asCoroutineDispatcher()
+        }
+    }
+
+    @Test
+    fun testCancellationDispatchCustomNoDelay() {
+        checkCancellationDispatch {
+            executor = Executors.newSingleThreadExecutor(it)
+            object : CoroutineDispatcher() {
+                override fun dispatch(context: CoroutineContext, block: Runnable) {
+                    executor!!.execute(block)
+                }
+            }
+        }
+    }
+
+    private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
+        expect(1)
+        var thread: Thread? = null
+        val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
+        run(dispatcher) {
+            expect(2)
+            Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+            try {
+                withTimeout(100) {
+                    try {
+                        expect(3)
+                        delay(1000)
+                        expectUnreached()
+                    } catch (e: CancellationException) {
+                        expect(4)
+                        Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+                    }
+                    expect(5)
+                }
+            } catch (e: CancellationException) {
+                expect(6)
+                Assert.assertThat(Thread.currentThread(), IsEqual(thread))
+            }
+            expect(7)
+        }
+        finish(8)
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt
new file mode 100644
index 0000000..b8842c1
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectTimeoutTest.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental.selects
+
+import kotlinx.coroutines.experimental.TestBase
+import kotlinx.coroutines.experimental.runBlocking
+import org.hamcrest.MatcherAssert.assertThat
+import org.hamcrest.core.IsEqual
+import org.junit.Test
+
+class SelectTimeoutTest : TestBase() {
+    @Test
+    fun testBasic() = runBlocking {
+        expect(1)
+        val result = select<String> {
+            onTimeout(1000) {
+                expectUnreached()
+                "FAIL"
+            }
+            onTimeout(100) {
+                expect(2)
+                "OK"
+            }
+            onTimeout(500) {
+                expectUnreached()
+                "FAIL"
+            }
+        }
+        assertThat(result, IsEqual("OK"))
+        finish(3)
+    }
+}
\ No newline at end of file