Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | 7b14046 | 2021-05-12 21:56:16 +0000 | [diff] [blame] | 2 | * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.channels |
| 6 | |
| 7 | import kotlinx.coroutines.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 8 | import kotlin.coroutines.* |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 9 | |
| 10 | /** |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 11 | * Scope for the [produce][CoroutineScope.produce] coroutine builder. |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 12 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 13 | * **Note: This is an experimental api.** Behavior of producers that work as children in a parent scope with respect |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 14 | * to cancellation and error handling may change in the future. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 15 | */ |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 16 | @ExperimentalCoroutinesApi |
Roman Elizarov | f1d9a4e | 2017-04-05 10:53:14 +0300 | [diff] [blame] | 17 | public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> { |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 18 | /** |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 19 | * A reference to the channel this coroutine [sends][send] elements to. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 20 | * It is provided for convenience, so that the code in the coroutine can refer |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 21 | * to the channel as `channel` as opposed to `this`. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 22 | * All the [SendChannel] functions on this interface delegate to |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 23 | * the channel instance returned by this property. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 24 | */ |
Vsevolod Tolstopyatov | 9cbad7d | 2020-03-27 17:43:45 +0300 | [diff] [blame] | 25 | public val channel: SendChannel<E> |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 26 | } |
| 27 | |
| 28 | /** |
Vsevolod Tolstopyatov | b08d61c | 2019-05-28 00:37:46 +0300 | [diff] [blame] | 29 | * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel] |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 30 | * and invokes the given [block] before resuming the coroutine. |
| 31 | * |
| 32 | * This suspending function is cancellable. |
| 33 | * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was |
| 34 | * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details. |
Vsevolod Tolstopyatov | b08d61c | 2019-05-28 00:37:46 +0300 | [diff] [blame] | 35 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 36 | * Note that when the producer channel is cancelled, this function resumes with a cancellation exception. |
| 37 | * Therefore, in case of cancellation, no code after the call to this function will be executed. |
| 38 | * That's why this function takes a lambda parameter. |
Vsevolod Tolstopyatov | b08d61c | 2019-05-28 00:37:46 +0300 | [diff] [blame] | 39 | * |
| 40 | * Example of usage: |
| 41 | * ``` |
| 42 | * val callbackEventsStream = produce { |
| 43 | * val disposable = registerChannelInCallback(channel) |
| 44 | * awaitClose { disposable.dispose() } |
| 45 | * } |
| 46 | * ``` |
| 47 | */ |
| 48 | @ExperimentalCoroutinesApi |
Louis CAD | b5a8493 | 2019-07-03 11:31:12 +0200 | [diff] [blame] | 49 | public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 50 | check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can only be invoked from the producer context" } |
Vsevolod Tolstopyatov | b08d61c | 2019-05-28 00:37:46 +0300 | [diff] [blame] | 51 | try { |
| 52 | suspendCancellableCoroutine<Unit> { cont -> |
| 53 | invokeOnClose { |
| 54 | cont.resume(Unit) |
| 55 | } |
| 56 | } |
| 57 | } finally { |
| 58 | block() |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | /** |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 63 | * Launches a new coroutine to produce a stream of values by sending them to a channel |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 64 | * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 65 | * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 66 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 67 | * The scope of the coroutine contains the [ProducerScope] interface, which implements |
| 68 | * both [CoroutineScope] and [SendChannel], so that the coroutine can invoke |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 69 | * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close] |
| 70 | * when the coroutine completes. |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 71 | * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel]. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 72 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 73 | * The coroutine context is inherited from this [CoroutineScope]. Additional context elements can be specified with the [context] argument. |
| 74 | * If the context does not have any dispatcher or other [ContinuationInterceptor], then [Dispatchers.Default] is used. |
| 75 | * The parent job is inherited from the [CoroutineScope] as well, but it can also be overridden |
Marek Langiewicz | f86af23 | 2019-10-06 14:10:26 +0200 | [diff] [blame] | 76 | * with a corresponding [context] element. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 77 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 78 | * Any uncaught exception in this coroutine will close the channel with this exception as the cause and |
| 79 | * the resulting channel will become _failed_, so that any attempt to receive from it thereafter will throw an exception. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 80 | * |
Roman Elizarov | 5633f91 | 2018-09-23 19:08:36 +0300 | [diff] [blame] | 81 | * The kind of the resulting channel depends on the specified [capacity] parameter. |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 82 | * See the [Channel] interface documentation for details. |
Roman Elizarov | 89f8ff7 | 2018-03-14 13:39:03 +0300 | [diff] [blame] | 83 | * |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 84 | * See [newCoroutineContext] for a description of debugging facilities available for newly created coroutines. |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 85 | * |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 86 | * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect |
| 87 | * to cancellation and error handling may change in the future. |
| 88 | * |
Roman Elizarov | dc29b07 | 2018-09-11 18:42:03 +0300 | [diff] [blame] | 89 | * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. |
Roman Elizarov | c0d559b | 2017-09-28 14:27:05 +0300 | [diff] [blame] | 90 | * @param capacity capacity of the channel's buffer (no buffer by default). |
| 91 | * @param block the coroutine code. |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 92 | */ |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 93 | @ExperimentalCoroutinesApi |
| 94 | public fun <E> CoroutineScope.produce( |
| 95 | context: CoroutineContext = EmptyCoroutineContext, |
| 96 | capacity: Int = 0, |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 97 | @BuilderInference block: suspend ProducerScope<E>.() -> Unit |
Roman Elizarov | 34c3464 | 2020-10-13 14:02:52 +0300 | [diff] [blame] | 98 | ): ReceiveChannel<E> = |
| 99 | produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block) |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 100 | |
| 101 | /** |
Yanis Batura | a7afd46 | 2019-08-05 20:32:34 +0700 | [diff] [blame] | 102 | * **This is an internal API and should not be used from general code.** |
| 103 | * The `onCompletion` parameter will be redesigned. |
| 104 | * If you have to use the `onCompletion` operator, please report to https://github.com/Kotlin/kotlinx.coroutines/issues/. |
Vsevolod Tolstopyatov | c81dc91 | 2019-05-31 13:16:19 +0300 | [diff] [blame] | 105 | * As a temporary solution, [invokeOnCompletion][Job.invokeOnCompletion] can be used instead: |
| 106 | * ``` |
| 107 | * fun <E> ReceiveChannel<E>.myOperator(): ReceiveChannel<E> = GlobalScope.produce(Dispatchers.Unconfined) { |
| 108 | * coroutineContext[Job]?.invokeOnCompletion { consumes() } |
| 109 | * } |
| 110 | * ``` |
| 111 | * @suppress |
Roman Elizarov | 27b8f45 | 2018-09-20 21:23:41 +0300 | [diff] [blame] | 112 | */ |
| 113 | @InternalCoroutinesApi |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 114 | public fun <E> CoroutineScope.produce( |
| 115 | context: CoroutineContext = EmptyCoroutineContext, |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 116 | capacity: Int = 0, |
Vsevolod Tolstopyatov | 4a53d23 | 2020-03-03 15:33:40 +0300 | [diff] [blame] | 117 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | 55a66ac | 2018-03-12 20:15:07 +0300 | [diff] [blame] | 118 | onCompletion: CompletionHandler? = null, |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 119 | @BuilderInference block: suspend ProducerScope<E>.() -> Unit |
Roman Elizarov | 34c3464 | 2020-10-13 14:02:52 +0300 | [diff] [blame] | 120 | ): ReceiveChannel<E> = |
| 121 | produce(context, capacity, BufferOverflow.SUSPEND, start, onCompletion, block) |
| 122 | |
| 123 | // Internal version of produce that is maximally flexible, but is not exposed through public API (too many params) |
| 124 | internal fun <E> CoroutineScope.produce( |
| 125 | context: CoroutineContext = EmptyCoroutineContext, |
| 126 | capacity: Int = 0, |
| 127 | onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, |
| 128 | start: CoroutineStart = CoroutineStart.DEFAULT, |
| 129 | onCompletion: CompletionHandler? = null, |
| 130 | @BuilderInference block: suspend ProducerScope<E>.() -> Unit |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 131 | ): ReceiveChannel<E> { |
Roman Elizarov | 34c3464 | 2020-10-13 14:02:52 +0300 | [diff] [blame] | 132 | val channel = Channel<E>(capacity, onBufferOverflow) |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 133 | val newContext = newCoroutineContext(context) |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 134 | val coroutine = ProducerCoroutine(newContext, channel) |
Roman Elizarov | 55a66ac | 2018-03-12 20:15:07 +0300 | [diff] [blame] | 135 | if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) |
Vsevolod Tolstopyatov | 4a53d23 | 2020-03-03 15:33:40 +0300 | [diff] [blame] | 136 | coroutine.start(start, coroutine, block) |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 137 | return coroutine |
Roman Elizarov | a5e653f | 2017-02-13 13:49:55 +0300 | [diff] [blame] | 138 | } |
| 139 | |
Vsevolod Tolstopyatov | 0342a0a | 2019-08-22 15:20:59 +0300 | [diff] [blame] | 140 | internal open class ProducerCoroutine<E>( |
Roman Elizarov | 9faa61e | 2018-02-22 23:20:28 +0300 | [diff] [blame] | 141 | parentContext: CoroutineContext, channel: Channel<E> |
Vsevolod Tolstopyatov | d92b0fa | 2018-10-08 19:41:18 +0300 | [diff] [blame] | 142 | ) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> { |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 143 | override val isActive: Boolean |
Vsevolod Tolstopyatov | d92b0fa | 2018-10-08 19:41:18 +0300 | [diff] [blame] | 144 | get() = super.isActive |
Vsevolod Tolstopyatov | 79414ec | 2018-08-30 16:50:56 +0300 | [diff] [blame] | 145 | |
Roman Elizarov | 6227c64 | 2019-03-19 13:22:19 +0300 | [diff] [blame] | 146 | override fun onCompleted(value: Unit) { |
| 147 | _channel.close() |
| 148 | } |
| 149 | |
| 150 | override fun onCancelled(cause: Throwable, handled: Boolean) { |
Roman Elizarov | 6685fd0 | 2018-09-25 13:23:53 +0300 | [diff] [blame] | 151 | val processed = _channel.close(cause) |
Roman Elizarov | 6227c64 | 2019-03-19 13:22:19 +0300 | [diff] [blame] | 152 | if (!processed && !handled) handleCoroutineException(context, cause) |
Roman Elizarov | 9faa61e | 2018-02-22 23:20:28 +0300 | [diff] [blame] | 153 | } |
| 154 | } |