blob: f4fe830bb3cbfbf4ca1e3e51a0e5e83704d83a60 [file] [log] [blame]
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +03001package kotlinx.coroutines.experimental.channels
2
3import kotlinx.coroutines.experimental.*
4import kotlinx.coroutines.experimental.timeunit.*
5import kotlin.coroutines.experimental.*
6
Roman Elizarovb5328a72018-06-06 18:31:21 +03007/**
8 * Mode for [ticker] function.
9 */
10enum class TickerMode {
11 /**
12 * Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
13 * **This is a default mode.**
14 *
15 * ```
16 * val channel = ticker(delay = 100)
17 * delay(350) // 250 ms late
18 * println(channel.poll()) // prints Unit
19 * println(channel.poll()) // prints null
20 *
21 * delay(50)
22 * println(channel.poll()) // prints Unit, delay was adjusted
23 * delay(50)
24 * println(channel.poll()) // prints null, we'are not late relatively to previous element
25 * ```
26 */
27 FIXED_PERIOD,
28
29 /**
30 * Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
31 */
32 FIXED_DELAY
33}
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030034
35/**
Roman Elizarovb5328a72018-06-06 18:31:21 +030036 * Creates a channel that produces the first item after the given initial delay and subsequent items with the
37 * given delay between them.
38 *
39 * The resulting channel is a [rendezvous channel][RendezvousChannel]. When receiver from this channel does not keep
40 * up with receiving the elements from this channel, they are not being being send due to backpressure. The actual
41 * timing behavior of ticker in this case is controlled by [mode] parameter which
42 * is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030043 *
44 * This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030045 *
Roman Elizarovb5328a72018-06-06 18:31:21 +030046 * **Note** producer to this channel is dispatched via [Unconfined] dispatcher by default and started eagerly.
47 *
48 * @param delay delay between each element.
49 * @param unit unit of time that applies to [initialDelay] and [delay] (in milliseconds by default).
50 * @param initialDelay delay after which the first element will be produced (it is equal to [delay] by default).
51 * @param context context of the producing coroutine.
52 * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030053 */
54public fun ticker(
55 delay: Long,
56 unit: TimeUnit = TimeUnit.MILLISECONDS,
57 initialDelay: Long = delay,
Roman Elizarovb5328a72018-06-06 18:31:21 +030058 context: CoroutineContext = EmptyCoroutineContext,
59 mode: TickerMode = TickerMode.FIXED_PERIOD
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030060): ReceiveChannel<Unit> {
61 require(delay >= 0) { "Expected non-negative delay, but has $delay" }
62 require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
Roman Elizarovb5328a72018-06-06 18:31:21 +030063 return produce(Unconfined + context, capacity = 0) {
64 when(mode) {
65 TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, unit, initialDelay, channel)
66 TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, unit, initialDelay, channel)
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030067 }
68 }
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030069}
70
Roman Elizarovb5328a72018-06-06 18:31:21 +030071private suspend fun fixedPeriodTicker(
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030072 delay: Long,
Roman Elizarovb5328a72018-06-06 18:31:21 +030073 unit: TimeUnit,
74 initialDelay: Long,
75 channel: SendChannel<Unit>
76) {
77 var deadline = timeSource.nanoTime() + unit.toNanos(initialDelay)
78 delay(initialDelay, unit)
79 val delayNs = unit.toNanos(delay)
80 while (true) {
81 deadline += delayNs
82 channel.send(Unit)
83 val now = timeSource.nanoTime()
84 val nextDelay = (deadline - now).coerceAtLeast(0)
85 if (nextDelay == 0L && delayNs != 0L) {
86 val adjustedDelay = delayNs - (now - deadline) % delayNs
87 deadline = now + adjustedDelay
88 delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
89 } else {
90 delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030091 }
92 }
Roman Elizarovb5328a72018-06-06 18:31:21 +030093}
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +030094
Roman Elizarovb5328a72018-06-06 18:31:21 +030095private suspend fun fixedDelayTicker(
96 delay: Long,
97 unit: TimeUnit,
98 initialDelay: Long,
99 channel: SendChannel<Unit>
100) {
101 delay(initialDelay, unit)
102 while (true) {
103 channel.send(Unit)
104 delay(delay, unit)
105 }
Vsevolod Tolstopyatov03d2ff72018-05-29 17:28:20 +0300106}