blob: e0b4f9d2a58c73a004915c348791cef3df75a5ff [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@file:JvmName("ChannelsKt")
@file:Suppress("DEPRECATION_ERROR")
@file:OptIn(ExperimentalContracts::class)
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.jvm.*
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
// -------- Operations on BroadcastChannel --------
/**
* Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
* from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
*
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
*/
@ObsoleteCoroutinesApi
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
try {
return channel.block()
} finally {
channel.cancel()
}
}
/**
* This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
*
* This function is considered error-prone for the following reasons;
* * Is throwing if the channel has failed even though its signature may suggest it returns 'null'
* * It is easy to forget that exception handling still have to be explicit
* * During code reviews and code reading, intentions of the code are frequently unclear:
* are potential exceptions ignored deliberately or not?
*
* @suppress doc
*/
@Deprecated(
"Deprecated in the favour of 'receiveCatching'",
ReplaceWith("receiveCatching().getOrNull()"),
DeprecationLevel.WARNING
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).receiveOrNull()
}
/**
* This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
*/
@Deprecated(
"Deprecated in the favour of 'onReceiveCatching'",
level = DeprecationLevel.WARNING
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).onReceiveOrNull
}
/**
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
*
* The operation is _terminal_.
*/
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
var cause: Throwable? = null
try {
return block()
} catch (e: Throwable) {
cause = e
throw e
} finally {
cancelConsumed(cause)
}
}
/**
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
* the channel after the execution of the block.
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
}
/**
* Returns a [List] containing all elements.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*/
@OptIn(ExperimentalStdlibApi::class)
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
add(it)
}
}
/**
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
*
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
*/
@ObsoleteCoroutinesApi
public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (element in this) action(element)
}
@PublishedApi
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let {
it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
})
}