Implement DelayChannel
Fixes #327
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt
new file mode 100644
index 0000000..9159b2d
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/DelayChannel.kt
@@ -0,0 +1,72 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.timeunit.*
+
+
+/**
+ * Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
+ * given delay between emissions. Backpressure is guaranteed by [RendezvousChannel], but no upper bound on actual delay is guaranteed.
+ * If the consumer of this channel cannot keep up with given delay
+ * and spends more than [delay] between subsequent invocations to [ReceiveChannel.receive] then rate and a maximum delay
+ * of outcoming events will be limited by the consumer.
+ *
+ * This channel stops emission immediately after [ReceiveChannel.cancel] invocation.
+ * **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
+ *
+ * @param initialDelay delay after which the first item will be emitted
+ * @param delay delay between
+ * @param unit unit of time that applies to [initialDelay] and [delay]
+ */
+public fun DelayChannel(
+ delay: Long,
+ unit: TimeUnit = TimeUnit.MILLISECONDS,
+ initialDelay: Long = 0
+): ReceiveChannel<Unit> {
+ require(delay >= 0) { "Expected non-negative delay, but has $delay" }
+ require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
+
+ val result = RendezvousChannel<Unit>()
+ launch(start = CoroutineStart.UNDISPATCHED) {
+ delay(initialDelay, unit)
+ while (true) {
+ val sendTime = timeSource.nanoTime()
+ result.send(Unit)
+ val queueTime = timeSource.nanoTime() - sendTime
+ val nextDelay = (unit.toNanos(delay) - queueTime).coerceAtLeast(0L)
+ delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
+ }
+ }
+
+ return result
+}
+
+/**
+ * Creates rendezvous channel which emits the first item after the given initial delay and subsequent items with the
+ * given delay after consumption of previously emitted item. Backpressure is guaranteed by [RendezvousChannel] machinery.
+ * This channel stops emitting items immediately after [ReceiveChannel.cancel] invocation.
+ * **Note** producer to this channel is dispatched via [Unconfined] dispatcher and started eagerly
+ *
+ * @param initialDelay delay after which the first item will be emitted
+ * @param delay delay between
+ * @param unit unit of time that applies to [initialDelay] and [delay]
+ */
+public fun FixedDelayChannel(
+ delay: Long,
+ unit: TimeUnit = TimeUnit.MILLISECONDS,
+ initialDelay: Long = 0
+): ReceiveChannel<Unit> {
+ require(delay >= 0) { "Expected non-negative delay, but has $delay" }
+ require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
+
+ val result = RendezvousChannel<Unit>()
+ launch(context = Unconfined) {
+ delay(initialDelay, unit)
+ while (true) {
+ result.send(Unit)
+ delay(delay, unit)
+ }
+ }
+
+ return result
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt
new file mode 100644
index 0000000..27b7071
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package guide.channel.example10
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.*
+import kotlin.coroutines.experimental.*
+
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
+ var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+ println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
+
+ nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
+ println("Next element is not ready in 50 ms: $nextElement")
+
+ nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
+ println("Next element is ready in 100 ms: $nextElement")
+
+ // Emulate large consumption delays
+ println("Consumer pause in 150ms")
+ delay(150)
+ // Next element is available immediately
+ nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+ println("Next element is available immediately after large consumer delay: $nextElement")
+ // Note that the pause between `receive` calls is taken into account and next element arrives faster
+ nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
+ println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
+
+ delayChannel.cancel() // indicate that no more elements are needed
+}
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
index ac59812..535d9f5 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt
@@ -372,6 +372,18 @@
}
@Test
+ fun testGuideChannelExample10() {
+ test("GuideChannelExample10") { guide.channel.example10.main(emptyArray()) }.verifyLines(
+ "Initial element is available immediately: kotlin.Unit",
+ "Next element is not ready in 50 ms: null",
+ "Next element is ready in 100 ms: kotlin.Unit",
+ "Consumer pause in 150ms",
+ "Next element is available immediately after large consumer delay: kotlin.Unit",
+ "Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit"
+ )
+ }
+
+ @Test
fun testGuideSyncExample01() {
test("GuideSyncExample01") { guide.sync.example01.main(emptyArray()) }.verifyLinesStart(
"Completed 1000000 actions in",
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt
new file mode 100644
index 0000000..f69e29b
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelCommonTest.kt
@@ -0,0 +1,160 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.selects.*
+import org.junit.Test
+import org.junit.runner.*
+import org.junit.runners.*
+import java.io.*
+import kotlin.test.*
+
+
+@RunWith(Parameterized::class)
+class TimerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
+
+ companion object {
+ @Parameterized.Parameters(name = "{0}")
+ @JvmStatic
+ fun params(): Collection<Array<Any>> =
+ Channel.values().map { arrayOf<Any>(it) }
+ }
+
+ enum class Channel {
+ DELAY {
+ override fun invoke(delay: Long, initialDelay: Long) = DelayChannel(delay, initialDelay = initialDelay)
+ },
+
+ FIXED_DELAY {
+ override fun invoke(delay: Long, initialDelay: Long) = FixedDelayChannel(delay, initialDelay = initialDelay)
+ };
+
+
+ abstract operator fun invoke(delay: Long, initialDelay: Long = 0): ReceiveChannel<Unit>
+ }
+
+
+ @Test
+ fun testDelay() = runTest {
+ val delayChannel = channelFactory(delay = 100)
+ delayChannel.checkNotEmpty()
+ delayChannel.checkEmpty()
+
+ delay(50)
+ delayChannel.checkEmpty()
+ delay(52)
+ delayChannel.checkNotEmpty()
+
+ delayChannel.cancel()
+ delay(52)
+ delayChannel.checkEmpty()
+ delayChannel.cancel()
+ }
+
+ @Test
+ fun testInitialDelay() = runBlocking<Unit> {
+ val delayChannel = channelFactory(initialDelay = 75, delay = 100)
+ delayChannel.checkEmpty()
+ delay(50)
+ delayChannel.checkEmpty()
+ delay(30)
+ delayChannel.checkNotEmpty()
+
+ // Regular delay
+ delay(75)
+ delayChannel.checkEmpty()
+ delay(26)
+ delayChannel.checkNotEmpty()
+ delayChannel.cancel()
+ }
+
+
+ @Test
+ fun testReceive() = runBlocking<Unit> {
+ val delayChannel = channelFactory(delay = 100)
+ delayChannel.checkNotEmpty()
+ var value = withTimeoutOrNull(75) {
+ delayChannel.receive()
+ 1
+ }
+
+ assertNull(value)
+ value = withTimeoutOrNull(26) {
+ delayChannel.receive()
+ 1
+ }
+
+ assertNotNull(value)
+ delayChannel.cancel()
+ }
+
+ @Test
+ fun testComplexOperator() = runBlocking {
+ val producer = produce {
+ for (i in 1..7) {
+ send(i)
+ delay(100)
+ }
+ }
+
+ val averages = producer.averageInTimeWindow(300).toList()
+ assertEquals(listOf(2.0, 5.0, 7.0), averages)
+ }
+
+ private fun ReceiveChannel<Int>.averageInTimeWindow(timespan: Long) = produce {
+ val delayChannel = channelFactory(delay = timespan, initialDelay = timespan)
+ var sum = 0
+ var n = 0
+ whileSelect {
+ this@averageInTimeWindow.onReceiveOrNull {
+ when (it) {
+ null -> {
+ // Send leftovers and bail out
+ if (n != 0) send(sum / n.toDouble())
+ false
+ }
+ else -> {
+ sum += it
+ ++n
+ true
+ }
+ }
+ }
+
+ // Timeout, send aggregated average and reset counters
+ delayChannel.onReceive {
+ send(sum / n.toDouble())
+ sum = 0
+ n = 0
+ true
+ }
+ }
+
+ delayChannel.cancel()
+ }
+
+ @Test
+ fun testStress() = runBlocking<Unit> {
+ // No OOM/SOE
+ val iterations = 500_000 * stressTestMultiplier
+ val delayChannel = channelFactory(0)
+ repeat(iterations) {
+ delayChannel.receive()
+ }
+
+ delayChannel.cancel()
+ }
+
+ @Test(expected = IllegalArgumentException::class)
+ fun testNegativeDelay() {
+ channelFactory(-1)
+ }
+
+ @Test(expected = IllegalArgumentException::class)
+ fun testNegativeInitialDelay() {
+ channelFactory(initialDelay = -1, delay = 100)
+ }
+}
+
+fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
+
+fun ReceiveChannel<Unit>.checkNotEmpty() = assertNotNull(poll())
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt
new file mode 100644
index 0000000..4905603
--- /dev/null
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DelayChannelTest.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental.channels
+
+import kotlinx.coroutines.experimental.*
+import org.junit.*
+
+class DelayChannelTest : TestBase() {
+
+ @Test
+ fun testFixedDelayChannelBackpressure() = runBlocking<Unit> {
+ val delayChannel = FixedDelayChannel(delay = 100)
+ delayChannel.checkNotEmpty()
+ delayChannel.checkEmpty()
+
+ delay(150)
+ delayChannel.checkNotEmpty()
+ delay(50)
+ delayChannel.checkEmpty()
+ delay(52)
+ delayChannel.checkNotEmpty()
+ delayChannel.cancel()
+ }
+
+ @Test
+ fun testDelayChannelBackpressure() = runBlocking<Unit> {
+ val delayChannel = DelayChannel(delay = 100)
+ delayChannel.checkNotEmpty()
+ delayChannel.checkEmpty()
+
+ delay(150)
+ delayChannel.checkNotEmpty()
+ delay(52)
+ delayChannel.checkNotEmpty()
+ delay(50)
+ delayChannel.checkEmpty()
+ delay(52)
+ delayChannel.checkNotEmpty()
+ delayChannel.cancel()
+ }
+}
diff --git a/coroutines-guide.md b/coroutines-guide.md
index 90ff4be..5941471 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -88,6 +88,7 @@
* [Fan-out](#fan-out)
* [Fan-in](#fan-in)
* [Buffered channels](#buffered-channels)
+ * [Delay channels](#delay-channels)
* [Channels are fair](#channels-are-fair)
* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
* [The problem](#the-problem)
@@ -1666,6 +1667,63 @@
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
+### Delay channels
+
+Delay channel is a special rendezvous channel, which emits `Unit` every time given delay passes since last consumption from this channel.
+Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] operators, using this channel as one of [select] clauses and performing "timeout" action in its [onReceive][ReceiveChannel.onReceive].
+
+To create such channel, use factory methods [DelayChannel()] and [FixedDelayChannel()] and to indicate that no further elements are needed use [ReceiveChannel.cancel] method on it.
+
+Now let's see how it works in practice:
+<!--- INCLUDE
+import kotlin.coroutines.experimental.*
+-->
+
+```kotlin
+fun main(args: Array<String>) = runBlocking<Unit> {
+ val delayChannel = DelayChannel(delay = 100, initialDelay = 0) // create delay channel
+ var nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+ println("Initial element is available immediately: $nextElement") // Initial delay haven't passed yet
+
+ nextElement = withTimeoutOrNull(50) { delayChannel.receive() } // All subsequent elements has 100ms delay
+ println("Next element is not ready in 50 ms: $nextElement")
+
+ nextElement = withTimeoutOrNull(51) { delayChannel.receive() }
+ println("Next element is ready in 100 ms: $nextElement")
+
+ // Emulate large consumption delays
+ println("Consumer pause in 150ms")
+ delay(150)
+ // Next element is available immediately
+ nextElement = withTimeoutOrNull(1) { delayChannel.receive() }
+ println("Next element is available immediately after large consumer delay: $nextElement")
+ // Note that the pause between `receive` calls is taken into account and next element arrives faster
+ nextElement = withTimeoutOrNull(60) { delayChannel.receive() } // 60 instead of 50 to mitigate scheduler delays
+ println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
+
+ delayChannel.cancel() // indicate that no more elements are needed
+}
+```
+
+> You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt)
+
+It prints following lines:
+
+```text
+Initial element is available immediately: kotlin.Unit
+Next element is not ready in 50 ms: null
+Next element is ready in 100 ms: kotlin.Unit
+Consumer pause in 150ms
+Next element is available immediately after large consumer delay: kotlin.Unit
+Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
+```
+
+<!--- TEST -->
+
+Note that [DelayChannel()] is aware of possible consumer pauses and adapts next element emission if a pause occurs.
+[FixedDelayChannel()] doesn't do so and simply emits elements with fixed delay after consumption, but both have built-in backpressure
+via [RendezvousChannel]
+
### Channels are fair
Send and receive operations to channels are _fair_ with respect to the order of their invocation from
@@ -2444,8 +2502,12 @@
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html
-[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html
+[DelayChannel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-delay-channel.html
+[FixedDelayChannel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-fixed-delay-channel.html
+[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/cancel.html
+[RendezvousChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-rendezvous-channel/index.html
+[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html
<!--- INDEX kotlinx.coroutines.experimental.selects -->