blob: 979a86e094e1b123603b0af87c3c621a8ef9462d [file] [log] [blame]
Roman Elizarov89f8ff72018-03-14 13:39:03 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
19import kotlinx.coroutines.experimental.*
20import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
21import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
22import kotlinx.coroutines.experimental.intrinsics.*
23import kotlin.coroutines.experimental.*
24
25/**
26 * Broadcasts all elements of the channel.
27 *
28 * @param capacity capacity of the channel's buffer (1 by default).
29 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
30 */
31fun <E> ReceiveChannel<E>.broadcast(
32 capacity: Int = 1,
33 start: CoroutineStart = CoroutineStart.LAZY
34) : BroadcastChannel<E> =
35 broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
36 for (e in this@broadcast) {
37 send(e)
38 }
39 }
40
41/**
42 * Launches new coroutine to produce a stream of values by sending them to a broadcast channel
43 * and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
44 * object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
45 *
46 * The scope of the coroutine contains [ProducerScope] interface, which implements
47 * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
48 * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
49 * when the coroutine completes.
50 *
51 * The [context] for the new coroutine can be explicitly specified.
52 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
53 * The [coroutineContext] of the parent coroutine may be used,
54 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
55 * The parent job may be also explicitly specified using [parent] parameter.
56 *
57 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
58 *
59 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
60 * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
61 *
62 * The kind of the resulting channel depends on the specified [capacity] parameter:
63 * * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
64 * * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
65 * * otherwise -- throws [IllegalArgumentException].
66 *
67 * **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
68 * as [start] parameter has a value of [CoroutineStart.LAZY] by default.
69 * This ensures that the first subscriber does not miss any sent elements.
70 * However, later subscribers may miss elements.
71 *
72 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
73 *
74 * @param context context of the coroutine. The default value is [DefaultDispatcher].
75 * @param capacity capacity of the channel's buffer (1 by default).
76 * @param start coroutine start option. The default value is [CoroutineStart.LAZY].
77 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
78 * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
79 * @param block the coroutine code.
80 */
81public fun <E> broadcast(
82 context: CoroutineContext = DefaultDispatcher,
83 capacity: Int = 1,
84 start: CoroutineStart = CoroutineStart.LAZY,
85 parent: Job? = null,
86 onCompletion: CompletionHandler? = null,
87 block: suspend ProducerScope<E>.() -> Unit
88): BroadcastChannel<E> {
89 val channel = BroadcastChannel<E>(capacity)
90 val newContext = newCoroutineContext(context, parent)
91 val coroutine = if (start.isLazy)
92 LazyBroadcastCoroutine(newContext, channel, block) else
93 BroadcastCoroutine(newContext, channel, active = true)
94 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
95 coroutine.start(start, coroutine, block)
96 return coroutine
97}
98
99private open class BroadcastCoroutine<E>(
100 parentContext: CoroutineContext,
101 protected val _channel: BroadcastChannel<E>,
102 active: Boolean
103) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
104 override val channel: SendChannel<E>
105 get() = this
106
107 public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
108
109 override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
110 val cause = exceptionally?.cause
111 val processed = when (exceptionally) {
112 is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
113 else -> _channel.close(cause) // producer coroutine has completed -- close channel
114 }
115 if (!processed && cause != null)
116 handleCoroutineException(context, cause)
117 }
118
119 // Workaround for KT-23094
120 override suspend fun send(element: E) = _channel.send(element)
121}
122
123private class LazyBroadcastCoroutine<E>(
124 parentContext: CoroutineContext,
125 channel: BroadcastChannel<E>,
126 private val block: suspend ProducerScope<E>.() -> Unit
127) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
128 override fun openSubscription(): ReceiveChannel<E> {
129 // open subscription _first_
130 val subscription = _channel.openSubscription()
131 // then start coroutine
132 start()
133 return subscription
134 }
135
136 override fun onStart() {
137 block.startCoroutineCancellable(this, this)
138 }
139}
140