Implement DelayChannel
Fixes #327
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt
new file mode 100644
index 0000000..9159b2d
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt
@@ -0,0 +1,72 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.timeunit.*
+
+
+/**
+ * Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
+ * given delay between emissions. Backpressure is guaranteed by [RendezvousChannel], but no upper bound on actual delay is guaranteed.
+ * If the consumer of this channel cannot keep up with given delay
+ * and spends more than [delay] between subsequent invocations to [ReceiveChannel.receive] then rate and a maximum delay
+ * of outcoming events will be limited by the consumer.
+ *
+ * This channel stops emission immediately after [ReceiveChannel.cancel] invocation.
+ * **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
+ *
+ * @param initialDelay delay after which the first item will be emitted
+ * @param delay delay between
+ * @param unit unit of time that applies to [initialDelay] and [delay]
+ */
+public fun DelayChannel(
+    delay: Long,
+    unit: TimeUnit = TimeUnit.MILLISECONDS,
+    initialDelay: Long = 0
+): ReceiveChannel<Unit> {
+    require(delay >= 0) { "Expected non-negative delay, but has $delay" }
+    require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
+
+    val result = RendezvousChannel<Unit>()
+    launch(start = CoroutineStart.UNDISPATCHED) {
+        delay(initialDelay, unit)
+        while (true) {
+            val sendTime = timeSource.nanoTime()
+            result.send(Unit)
+            val queueTime = timeSource.nanoTime() - sendTime
+            val nextDelay = (unit.toNanos(delay) - queueTime).coerceAtLeast(0L)
+            delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
+        }
+    }
+
+    return result
+}
+
+/**
+ * Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
+ * given delay after consumption of previously emitted item. Backpressure is guaranteed by [RendezvousChannel] machinery.
+ * This channel stops emitting items immediately after [ReceiveChannel.cancel] invocation.
+ * **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
+ *
+ * @param initialDelay delay after which the first item will be emitted
+ * @param delay delay between
+ * @param unit unit of time that applies to [initialDelay] and [delay]
+ */
+public fun FixedDelayChannel(
+    delay: Long,
+    unit: TimeUnit = TimeUnit.MILLISECONDS,
+    initialDelay: Long = 0
+): ReceiveChannel<Unit> {
+    require(delay >= 0) { "Expected non-negative delay, but has $delay" }
+    require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
+
+    val result = RendezvousChannel<Unit>()
+    launch(context = Unconfined) {
+        delay(initialDelay, unit)
+        while (true) {
+            result.send(Unit)
+            delay(delay, unit)
+        }
+    }
+
+    return result
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt
new file mode 100644
index 0000000..27b7071
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example10
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlin.coroutines.experimental.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
+    var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+    println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
+
+    nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
+    println("Next element is not ready in 50 ms: $nextElement")
+
+    nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
+    println("Next element is ready in 100 ms: $nextElement")
+
+    // Emulate large consumption delays
+    println("Consumer pause in 150ms")
+    delay(150)
+    // Next element is available immediately
+    nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+    println("Next element is available immediately after large consumer delay: $nextElement")
+    // Note that the pause between `receive` calls is taken into account and next element arrives faster
+    nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
+    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
+
+    delayChannel.cancel() // indicate that no more elements are needed
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
index ac59812..535d9f5 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
@@ -372,6 +372,18 @@
     }
 
     @Test
+    fun testGuideChannelExample10() {
+        test("GuideChannelExample10") { guide.channel.example10.main(emptyArray()) }.verifyLines(
+            "Initial element is available immediately: kotlin.Unit",
+            "Next element is not ready in 50 ms: null",
+            "Next element is ready in 100 ms: kotlin.Unit",
+            "Consumer pause in 150ms",
+            "Next element is available immediately after large consumer delay: kotlin.Unit",
+            "Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit"
+        )
+    }
+
+    @Test
     fun testGuideSyncExample01() {
         test("GuideSyncExample01") { guide.sync.example01.main(emptyArray()) }.verifyLinesStart(
             "Completed 1000000 actions in",
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt
new file mode 100644
index 0000000..f69e29b
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt
@@ -0,0 +1,160 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.*
+import org.junit.Test
+import org.junit.runner.*
+import org.junit.runners.*
+import java.io.*
+import kotlin.test.*
+
+
+@RunWith(Parameterized::class)
+class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
+
+    companion object {
+        @Parameterized.Parameters(name = "{0}")
+        @JvmStatic
+        fun params(): Collection<Array<Any>> =
+            Channel.values().map { arrayOf<Any>(it) }
+    }
+
+    enum class Channel {
+        DELAY {
+            override fun invoke(delay: Long, initialDelay: Long) = DelayChannel(delay, initialDelay = initialDelay)
+        },
+
+        FIXED_DELAY {
+            override fun invoke(delay: Long, initialDelay: Long) = FixedDelayChannel(delay, initialDelay = initialDelay)
+        };
+
+
+        abstract operator fun invoke(delay: Long, initialDelay: Long = 0): ReceiveChannel<Unit>
+    }
+
+
+    @Test
+    fun testDelay() = runTest {
+        val delayChannel = channelFactory(delay = 100)
+        delayChannel.checkNotEmpty()
+        delayChannel.checkEmpty()
+
+        delay(50)
+        delayChannel.checkEmpty()
+        delay(52)
+        delayChannel.checkNotEmpty()
+
+        delayChannel.cancel()
+        delay(52)
+        delayChannel.checkEmpty()
+        delayChannel.cancel()
+    }
+
+    @Test
+    fun testInitialDelay() = runBlocking<Unit> {
+        val delayChannel = channelFactory(initialDelay = 75, delay = 100)
+        delayChannel.checkEmpty()
+        delay(50)
+        delayChannel.checkEmpty()
+        delay(30)
+        delayChannel.checkNotEmpty()
+
+        // Regular delay
+        delay(75)
+        delayChannel.checkEmpty()
+        delay(26)
+        delayChannel.checkNotEmpty()
+        delayChannel.cancel()
+    }
+
+
+    @Test
+    fun testReceive() = runBlocking<Unit> {
+        val delayChannel = channelFactory(delay = 100)
+        delayChannel.checkNotEmpty()
+        var value = withTimeoutOrNull(75) {
+            delayChannel.receive()
+            1
+        }
+
+        assertNull(value)
+        value = withTimeoutOrNull(26) {
+            delayChannel.receive()
+            1
+        }
+
+        assertNotNull(value)
+        delayChannel.cancel()
+    }
+
+    @Test
+    fun testComplexOperator() = runBlocking {
+        val producer = produce {
+            for (i in 1..7) {
+                send(i)
+                delay(100)
+            }
+        }
+
+        val averages = producer.averageInTimeWindow(300).toList()
+        assertEquals(listOf(2.0, 5.0, 7.0), averages)
+    }
+
+    private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {
+        val delayChannel = channelFactory(delay = timespan, initialDelay = timespan)
+        var sum = 0
+        var n = 0
+        whileSelect {
+            this@averageInTimeWindow.onReceiveOrNull {
+                when (it) {
+                    null -> {
+                        // Send leftovers and bail out
+                        if (n != 0) send(sum / n.toDouble())
+                        false
+                    }
+                    else -> {
+                        sum += it
+                        ++n
+                        true
+                    }
+                }
+            }
+
+            // Timeout, send aggregated average and reset counters
+            delayChannel.onReceive {
+                send(sum / n.toDouble())
+                sum = 0
+                n = 0
+                true
+            }
+        }
+
+        delayChannel.cancel()
+    }
+
+    @Test
+    fun testStress() = runBlocking<Unit> {
+        // No OOM/SOE
+        val iterations = 500_000 * stressTestMultiplier
+        val delayChannel = channelFactory(0)
+        repeat(iterations) {
+            delayChannel.receive()
+        }
+
+        delayChannel.cancel()
+    }
+
+    @Test(expected = IllegalArgumentException::class)
+    fun testNegativeDelay() {
+        channelFactory(-1)
+    }
+
+    @Test(expected = IllegalArgumentException::class)
+    fun testNegativeInitialDelay() {
+        channelFactory(initialDelay = -1, delay = 100)
+    }
+}
+
+fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
+
+fun ReceiveChannel<Unit>.checkNotEmpty() = assertNotNull(poll())
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt
new file mode 100644
index 0000000..4905603
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+
+class DelayChannelTest : TestBase() {
+
+    @Test
+    fun testFixedDelayChannelBackpressure() = runBlocking<Unit> {
+        val delayChannel = FixedDelayChannel(delay = 100)
+        delayChannel.checkNotEmpty()
+        delayChannel.checkEmpty()
+
+        delay(150)
+        delayChannel.checkNotEmpty()
+        delay(50)
+        delayChannel.checkEmpty()
+        delay(52)
+        delayChannel.checkNotEmpty()
+        delayChannel.cancel()
+    }
+
+    @Test
+    fun testDelayChannelBackpressure() = runBlocking<Unit> {
+        val delayChannel = DelayChannel(delay = 100)
+        delayChannel.checkNotEmpty()
+        delayChannel.checkEmpty()
+
+        delay(150)
+        delayChannel.checkNotEmpty()
+        delay(52)
+        delayChannel.checkNotEmpty()
+        delay(50)
+        delayChannel.checkEmpty()
+        delay(52)
+        delayChannel.checkNotEmpty()
+        delayChannel.cancel()
+    }
+}
diff --git a/coroutines-guide.md b/coroutines-guide.md
index 90ff4be..5941471 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -88,6 +88,7 @@
   * [Fan-out](#fan-out)
   * [Fan-in](#fan-in)
   * [Buffered channels](#buffered-channels)
+  * [Delay channels](#delay-channels)
   * [Channels are fair](#channels-are-fair)
 * [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
   * [The problem](#the-problem)
@@ -1666,6 +1667,63 @@
 
 The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
 
+### Delay channels
+
+Delay channel is a special rendezvous channel, which emits `Unit` every time given delay passes since last consumption from this channel.
+Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] operators, using this channel as one of [select] clauses and performing "timeout" action in its [onReceive][ReceiveChannel.onReceive].
+
+To create such channel, use factory methods [DelayChannel()] and [FixedDelayChannel()] and to indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
+
+Now let's see how it works in practice:
+<!--- INCLUDE  
+import kotlin.coroutines.experimental.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
+    var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+    println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
+
+    nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
+    println("Next element is not ready in 50 ms: $nextElement")
+
+    nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
+    println("Next element is ready in 100 ms: $nextElement")
+
+    // Emulate large consumption delays
+    println("Consumer pause in 150ms")
+    delay(150)
+    // Next element is available immediately
+    nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+    println("Next element is available immediately after large consumer delay: $nextElement")
+    // Note that the pause between `receive` calls is taken into account and next element arrives faster
+    nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
+    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
+
+    delayChannel.cancel() // indicate that no more elements are needed
+}
+```
+
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt)
+
+It prints following lines:
+
+```text
+Initial element is available immediately: kotlin.Unit
+Next element is not ready in 50 ms: null
+Next element is ready in 100 ms: kotlin.Unit
+Consumer pause in 150ms
+Next element is available immediately after large consumer delay: kotlin.Unit
+Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
+```
+
+<!--- TEST -->
+
+Note that [DelayChannel()] is aware of possible consumer pauses and adapts next element emission if a pause occurs. 
+[FixedDelayChannel()] doesn't do so and simply emits elements with fixed delay after consumption, but both have built-in backpressure 
+via [RendezvousChannel]
+
 ### Channels are fair
 
 Send and receive operations to channels are _fair_ with respect to the order of their invocation from 
@@ -2444,8 +2502,12 @@
 [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
 [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
 [Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html
-[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
 [ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
+[DelayChannel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-delay-channel.html
+[FixedDelayChannel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-fixed-delay-channel.html
+[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/cancel.html
+[RendezvousChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-rendezvous-channel/index.html
+[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
 [ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
 [SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
 <!--- INDEX kotlinx.coroutines.experimental.selects -->