blob: 0c23b0aef71ba643c244d9b351cda64595c3d283 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.timeunit.*
import kotlin.coroutines.experimental.*
/**
* Mode for [ticker] function.
*/
enum class TickerMode {
/**
* Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
* **This is a default mode.**
*
* ```
* val channel = ticker(delay = 100)
* delay(350) // 250 ms late
* println(channel.poll()) // prints Unit
* println(channel.poll()) // prints null
*
* delay(50)
* println(channel.poll()) // prints Unit, delay was adjusted
* delay(50)
* println(channel.poll()) // prints null, we'are not late relatively to previous element
* ```
*/
FIXED_PERIOD,
/**
* Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
*/
FIXED_DELAY
}
/**
* Creates a channel that produces the first item after the given initial delay and subsequent items with the
* given delay between them.
*
* The resulting channel is a [rendezvous channel][RendezvousChannel]. When receiver from this channel does not keep
* up with receiving the elements from this channel, they are not being being send due to backpressure. The actual
* timing behavior of ticker in this case is controlled by [mode] parameter which
* is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
*
* This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
*
* **Note** producer to this channel is dispatched via [Unconfined] dispatcher by default and started eagerly.
*
* @param delay delay between each element.
* @param unit unit of time that applies to [initialDelay] and [delay] (in milliseconds by default).
* @param initialDelay delay after which the first element will be produced (it is equal to [delay] by default).
* @param context context of the producing coroutine.
* @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
*/
public fun ticker(
delay: Long,
unit: TimeUnit = TimeUnit.MILLISECONDS,
initialDelay: Long = delay,
context: CoroutineContext = EmptyCoroutineContext,
mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit> {
require(delay >= 0) { "Expected non-negative delay, but has $delay" }
require(initialDelay >= 0) { "Expected non-negative initial delay, but has $initialDelay" }
return produce(Unconfined + context, capacity = 0) {
when(mode) {
TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, unit, initialDelay, channel)
TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, unit, initialDelay, channel)
}
}
}
private suspend fun fixedPeriodTicker(
delay: Long,
unit: TimeUnit,
initialDelay: Long,
channel: SendChannel<Unit>
) {
var deadline = timeSource.nanoTime() + unit.toNanos(initialDelay)
delay(initialDelay, unit)
val delayNs = unit.toNanos(delay)
while (true) {
deadline += delayNs
channel.send(Unit)
val now = timeSource.nanoTime()
val nextDelay = (deadline - now).coerceAtLeast(0)
if (nextDelay == 0L && delayNs != 0L) {
val adjustedDelay = delayNs - (now - deadline) % delayNs
deadline = now + adjustedDelay
delay(adjustedDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
} else {
delay(nextDelay, java.util.concurrent.TimeUnit.NANOSECONDS)
}
}
}
private suspend fun fixedDelayTicker(
delay: Long,
unit: TimeUnit,
initialDelay: Long,
channel: SendChannel<Unit>
) {
delay(initialDelay, unit)
while (true) {
channel.send(Unit)
delay(delay, unit)
}
}