blob: 6a7e5c26be706b2d52a6ecc9b12467cec82e5783 [file] [log] [blame]
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +03003 */
4
5package kotlinx.coroutines.experimental.channels
6
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03007import kotlinx.coroutines.experimental.*
Roman Elizarov8385ec92017-05-11 18:32:52 +03008import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
9import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
Roman Elizarovc8af35c2018-05-16 20:23:37 +030010import kotlinx.coroutines.experimental.internal.*
Roman Elizarovf5f09832018-05-16 15:10:28 +030011import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030012
13/**
14 * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
Marko Devcic1d6230a2018-04-04 20:13:08 +020015 * that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030016 * function.
Roman Elizarov8385ec92017-05-11 18:32:52 +030017 *
Roman Elizarov36387472017-07-21 18:37:49 +030018 * See `BroadcastChannel()` factory function for the description of available
Roman Elizarov8385ec92017-05-11 18:32:52 +030019 * broadcast channel implementations.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030020 */
21public interface BroadcastChannel<E> : SendChannel<E> {
22 /**
Roman Elizarov8385ec92017-05-11 18:32:52 +030023 * Factory for broadcast channels.
Roman Elizarov36387472017-07-21 18:37:49 +030024 * @suppress **Deprecated**
Roman Elizarov8385ec92017-05-11 18:32:52 +030025 */
26 public companion object Factory {
27 /**
28 * Creates a broadcast channel with the specified buffer capacity.
Roman Elizarov36387472017-07-21 18:37:49 +030029 * @suppress **Deprecated**
Roman Elizarov8385ec92017-05-11 18:32:52 +030030 */
Roman Elizarov36387472017-07-21 18:37:49 +030031 @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
32 public operator fun <E> invoke(capacity: Int): BroadcastChannel<E> = BroadcastChannel(capacity)
Roman Elizarov8385ec92017-05-11 18:32:52 +030033 }
34
35 /**
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030036 * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
Marko Devcic1d6230a2018-04-04 20:13:08 +020037 * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030038 * broadcast channel.
39 */
Marko Devcic1d6230a2018-04-04 20:13:08 +020040 public fun openSubscription(): ReceiveChannel<E>
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030041
42 /**
Roman Elizarovf5f09832018-05-16 15:10:28 +030043 * @suppress **Deprecated**: Return type changed to `ReceiveChannel`, this one left here for binary compatibility.
44 */
45 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Return type changed to `ReceiveChannel`, this one left here for binary compatibility")
Roman Elizarovc8af35c2018-05-16 20:23:37 +030046 @JvmName("openSubscription")
47 @Suppress("INAPPLICABLE_JVM_NAME")
48 public fun openSubscription1(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
Roman Elizarovf5f09832018-05-16 15:10:28 +030049
50 /**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030051 * @suppress **Deprecated**: Renamed to [openSubscription]
52 */
53 @Deprecated(message = "Renamed to `openSubscription`",
54 replaceWith = ReplaceWith("openSubscription()"))
Roman Elizarovf5f09832018-05-16 15:10:28 +030055 public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
Roman Elizarov89f8ff72018-03-14 13:39:03 +030056
57 /**
58 * Cancels reception of remaining elements from this channel. This function closes the channel with
59 * the specified cause (unless it was already closed), removes all buffered sent elements from it,
60 * and [cancels][ReceiveChannel.cancel] all open subscriptions.
61 * This function returns `true` if the channel was not closed previously, or `false` otherwise.
62 *
63 * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
64 * receive on a failed channel throw the specified [cause] exception.
65 */
66 public fun cancel(cause: Throwable? = null): Boolean
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030067}
68
69/**
Roman Elizarov36387472017-07-21 18:37:49 +030070 * Creates a broadcast channel with the specified buffer capacity.
71 *
72 * The resulting channel type depends on the specified [capacity] parameter:
Roman Elizarov89f8ff72018-03-14 13:39:03 +030073 * * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
74 * **Note:** this channel looses all items that are send to it until the first subscriber appears;
Roman Elizarov36387472017-07-21 18:37:49 +030075 * * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
76 * * otherwise -- throws [IllegalArgumentException].
77 */
78public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
79 when (capacity) {
80 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
81 UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
82 CONFLATED -> ConflatedBroadcastChannel()
83 else -> ArrayBroadcastChannel(capacity)
84 }
85
86/**
Roman Elizarov0f66a6d2017-06-22 14:57:53 +030087 * Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030088 * open subscription and to [close] it to unsubscribe.
Roman Elizarovb555d912017-08-17 21:01:33 +030089 *
90 * Note, that invocation of [cancel] also closes subscription.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030091 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030092@Deprecated("Deprecated in favour of `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030093public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
94 /**
Roman Elizarovb555d912017-08-17 21:01:33 +030095 * Closes this subscription. This is a synonym for [cancel].
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030096 */
Roman Elizarovf5f09832018-05-16 15:10:28 +030097 @Deprecated("Use `cancel`", replaceWith = ReplaceWith("cancel()"))
Roman Elizarovb555d912017-08-17 21:01:33 +030098 public override fun close() { cancel() }
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030099}
Vsevolod Tolstopyatov313978c2018-06-01 15:30:34 +0300100
101/** @suppress **Deprecated**: Left here for migration from SubscriptionReceiveChannel */
Roman Elizarovc038d632018-06-15 10:42:49 +0300102@Deprecated(
103 level = DeprecationLevel.WARNING,
104 message = "Use `ReceiveChannel<*>.consume` instead",
105 replaceWith = ReplaceWith("consume { let(block) }")
106)
107public inline fun <E, R> ReceiveChannel<E>.use(block: (ReceiveChannel<E>) -> R): R {
Vsevolod Tolstopyatov313978c2018-06-01 15:30:34 +0300108 var exception: Throwable? = null
109 try {
Roman Elizarovc038d632018-06-15 10:42:49 +0300110 return block(this)
Vsevolod Tolstopyatov313978c2018-06-01 15:30:34 +0300111 } catch (t: Throwable) {
112 exception = t
113 throw t
Roman Elizarovc038d632018-06-15 10:42:49 +0300114 } finally {
Vsevolod Tolstopyatov313978c2018-06-01 15:30:34 +0300115 this.cancel(exception)
116 }
117}