blob: 0c23b0aef71ba643c244d9b351cda64595c3d283 [file] [log] [blame]
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03001/*
2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +03005package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
8import kotlinx.coroutines.experimental.timeunit.*
9import kotlin.coroutines.experimental.*
10
Roman Elizarovb5328a72018-06-06 18:31:21 +030011/**
12 * Mode for [ticker] function.
13 */
14enum class TickerMode {
15 /**
16 * Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
17 * **This is a default mode.**
18 *
19 * ```
20 * val channel = ticker(delay = 100)
21 * delay(350) // 250 ms late
22 * println(channel.poll()) // prints Unit
23 * println(channel.poll()) // prints null
24 *
25 * delay(50)
26 * println(channel.poll()) // prints Unit, delay was adjusted
27 * delay(50)
28 * println(channel.poll()) // prints null, we'are not late relatively to previous element
29 * ```
30 */
31 FIXED_PERIOD,
32
33 /**
34 * Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
35 */
36 FIXED_DELAY
37}
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030038
39/**
Roman Elizarovb5328a72018-06-06 18:31:21 +030040 * Creates a channel that produces the first item after the given initial delay and subsequent items with the
41 * given delay between them.
42 *
43 * The resulting channel is a [rendezvous channel][RendezvousChannel]. When receiver from this channel does not keep
44 * up with receiving the elements from this channel, they are not being being send due to backpressure. The actual
45 * timing behavior of ticker in this case is controlled by [mode] parameter which
46 * is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030047 *
48 * This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030049 *
Roman Elizarovb5328a72018-06-06 18:31:21 +030050 * **Note** producer to this channel is dispatched via [Unconfined] dispatcher by default and started eagerly.
51 *
52 * @param delay delay between each element.
53 * @param unit unit of time that applies to [initialDelay] and [delay] (in milliseconds by default).
54 * @param initialDelay delay after which the first element will be produced (it is equal to [delay] by default).
55 * @param context context of the producing coroutine.
56 * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030057 */
58public fun ticker(
59 delay: Long,
60 unit: TimeUnit = TimeUnit.MILLISECONDS,
61 initialDelay: Long = delay,
Roman Elizarovb5328a72018-06-06 18:31:21 +030062 context: CoroutineContext = EmptyCoroutineContext,
63 mode: TickerMode = TickerMode.FIXED_PERIOD
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030064): ReceiveChannel<Unit> {
65 require(delay >= 0) { "Expected non-negative delay, but has $delay" }
66 require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
Roman Elizarovb5328a72018-06-06 18:31:21 +030067 return produce(Unconfined + context, capacity = 0) {
68 when(mode) {
69 TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, unit, initialDelay, channel)
70 TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, unit, initialDelay, channel)
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030071 }
72 }
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030073}
74
Roman Elizarovb5328a72018-06-06 18:31:21 +030075private suspend fun fixedPeriodTicker(
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030076 delay: Long,
Roman Elizarovb5328a72018-06-06 18:31:21 +030077 unit: TimeUnit,
78 initialDelay: Long,
79 channel: SendChannel<Unit>
80) {
81 var deadline = timeSource.nanoTime() + unit.toNanos(initialDelay)
82 delay(initialDelay, unit)
83 val delayNs = unit.toNanos(delay)
84 while (true) {
85 deadline += delayNs
86 channel.send(Unit)
87 val now = timeSource.nanoTime()
88 val nextDelay = (deadline - now).coerceAtLeast(0)
89 if (nextDelay == 0L && delayNs != 0L) {
90 val adjustedDelay = delayNs - (now - deadline) % delayNs
91 deadline = now + adjustedDelay
92 delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
93 } else {
94 delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030095 }
96 }
Roman Elizarovb5328a72018-06-06 18:31:21 +030097}
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030098
Roman Elizarovb5328a72018-06-06 18:31:21 +030099private suspend fun fixedDelayTicker(
100 delay: Long,
101 unit: TimeUnit,
102 initialDelay: Long,
103 channel: SendChannel<Unit>
104) {
105 delay(initialDelay, unit)
106 while (true) {
107 channel.send(Unit)
108 delay(delay, unit)
109 }
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +0300110}