* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.jvm.*
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
// -------- Operations on BroadcastChannel --------
* Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
* from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
* See [issue #254](
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
try {
return channel.block()
} finally {
* This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
* This function is considered error-prone for the following reasons;
* * Is throwing if the channel has failed even though its signature may suggest it returns 'null'
* * It is easy to forget that exception handling still have to be explicit
* * During code reviews and code reading, intentions of the code are frequently unclear:
* are potential exceptions ignored deliberately or not?
* @suppress doc
"Deprecated in the favour of 'receiveCatching'",
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
return (this as ReceiveChannel<E?>).receiveOrNull()
* This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
"Deprecated in the favour of 'onReceiveCatching'",
level = DeprecationLevel.WARNING
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
return (this as ReceiveChannel<E?>).onReceiveOrNull
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
* The operation is _terminal_.
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
var cause: Throwable? = null
try {
return block()
} catch (e: Throwable) {
cause = e
throw e
} finally {
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
* the channel after the execution of the block.
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
* Returns a [List] containing all elements.
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
* See [issue #254](
public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (element in this) action(element)
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let {
it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)