blob: d9d21de6bd154366a003942f806180b11437e793 [file] [log] [blame]
Roman Elizarov89f8ff72018-03-14 13:39:03 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov89f8ff72018-03-14 13:39:03 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
7import kotlinx.coroutines.experimental.*
8import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
9import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
10import kotlinx.coroutines.experimental.intrinsics.*
11import kotlin.coroutines.experimental.*
12
13/**
14 * Broadcasts all elements of the channel.
15 *
16 * @param capacity capacity of the channel's buffer (1 by default).
17 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
18 */
19fun <E> ReceiveChannel<E>.broadcast(
20 capacity: Int = 1,
21 start: CoroutineStart = CoroutineStart.LAZY
22) : BroadcastChannel<E> =
23 broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
24 for (e in this@broadcast) {
25 send(e)
26 }
27 }
28
29/**
30 * Launches new coroutine to produce a stream of values by sending them to a broadcast channel
31 * and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
32 * object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
33 *
34 * The scope of the coroutine contains [ProducerScope] interface, which implements
35 * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
36 * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
37 * when the coroutine completes.
38 *
39 * The [context] for the new coroutine can be explicitly specified.
40 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
41 * The [coroutineContext] of the parent coroutine may be used,
42 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
43 * The parent job may be also explicitly specified using [parent] parameter.
44 *
45 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
46 *
47 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
48 * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
49 *
50 * The kind of the resulting channel depends on the specified [capacity] parameter:
51 * * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
52 * * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
53 * * otherwise -- throws [IllegalArgumentException].
54 *
55 * **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
56 * as [start] parameter has a value of [CoroutineStart.LAZY] by default.
57 * This ensures that the first subscriber does not miss any sent elements.
58 * However, later subscribers may miss elements.
59 *
60 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
61 *
62 * @param context context of the coroutine. The default value is [DefaultDispatcher].
63 * @param capacity capacity of the channel's buffer (1 by default).
64 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
65 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
66 * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
67 * @param block the coroutine code.
68 */
69public fun <E> broadcast(
70 context: CoroutineContext = DefaultDispatcher,
71 capacity: Int = 1,
72 start: CoroutineStart = CoroutineStart.LAZY,
73 parent: Job? = null,
74 onCompletion: CompletionHandler? = null,
75 block: suspend ProducerScope<E>.() -> Unit
76): BroadcastChannel<E> {
77 val channel = BroadcastChannel<E>(capacity)
78 val newContext = newCoroutineContext(context, parent)
79 val coroutine = if (start.isLazy)
80 LazyBroadcastCoroutine(newContext, channel, block) else
81 BroadcastCoroutine(newContext, channel, active = true)
82 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
83 coroutine.start(start, coroutine, block)
84 return coroutine
85}
86
87private open class BroadcastCoroutine<E>(
88 parentContext: CoroutineContext,
89 protected val _channel: BroadcastChannel<E>,
90 active: Boolean
91) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
92 override val channel: SendChannel<E>
93 get() = this
94
95 public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
96
97 override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
98 val cause = exceptionally?.cause
99 val processed = when (exceptionally) {
100 is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
101 else -> _channel.close(cause) // producer coroutine has completed -- close channel
102 }
103 if (!processed && cause != null)
104 handleCoroutineException(context, cause)
105 }
106
107 // Workaround for KT-23094
108 override suspend fun send(element: E) = _channel.send(element)
109}
110
111private class LazyBroadcastCoroutine<E>(
112 parentContext: CoroutineContext,
113 channel: BroadcastChannel<E>,
114 private val block: suspend ProducerScope<E>.() -> Unit
115) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
116 override fun openSubscription(): ReceiveChannel<E> {
117 // open subscription _first_
118 val subscription = _channel.openSubscription()
119 // then start coroutine
120 start()
121 return subscription
122 }
123
124 override fun onStart() {
125 block.startCoroutineCancellable(this, this)
126 }
127}
128