Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2017 JetBrains s.r.o. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package kotlinx.coroutines.experimental.channels |
| 18 | |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 19 | import kotlinx.coroutines.experimental.* |
Roman Elizarov | 89f8ff7 | 2018-03-14 13:39:03 +0300 | [diff] [blame] | 20 | import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED |
Roman Elizarov | 23fb728 | 2018-01-24 23:09:42 +0300 | [diff] [blame] | 21 | import kotlinx.coroutines.experimental.intrinsics.* |
Roman Elizarov | 2adf8bc | 2018-01-24 20:09:57 +0300 | [diff] [blame] | 22 | import kotlinx.coroutines.experimental.selects.* |
| 23 | import kotlin.coroutines.experimental.* |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 24 | |
| 25 | /** |
| 26 | * Scope for [actor] coroutine builder. |
| 27 | */ |
Roman Elizarov | c18271c | 2017-03-21 18:17:26 +0300 | [diff] [blame] | 28 | public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> { |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 29 | /** |
| 30 | * A reference to the mailbox channel that this coroutine [receives][receive] messages from. |
| 31 | * It is provided for convenience, so that the code in the coroutine can refer |
| 32 | * to the channel as `channel` as apposed to `this`. |
| 33 | * All the [ReceiveChannel] functions on this interface delegate to |
| 34 | * the channel instance returned by this function. |
| 35 | */ |
Roman Elizarov | c18271c | 2017-03-21 18:17:26 +0300 | [diff] [blame] | 36 | val channel: Channel<E> |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 37 | } |
| 38 | |
| 39 | /** |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 40 | * @suppress **Deprecated**: Use `SendChannel`. |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 41 | */ |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 42 | @Deprecated(message = "Use `SendChannel`", replaceWith = ReplaceWith("SendChannel")) |
| 43 | interface ActorJob<in E> : SendChannel<E> { |
| 44 | @Deprecated(message = "Use SendChannel itself") |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 45 | val channel: SendChannel<E> |
| 46 | } |
| 47 | |
| 48 | /** |
| 49 | * Launches new coroutine that is receiving messages from its mailbox channel |
Roman Elizarov | 48aa20b | 2018-01-29 17:25:28 +0300 | [diff] [blame] | 50 | * and returns a reference to its mailbox channel as a [SendChannel]. The resulting |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 51 | * object can be used to [send][SendChannel.send] messages to this coroutine. |
| 52 | * |
| 53 | * The scope of the coroutine contains [ActorScope] interface, which implements |
| 54 | * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke |
| 55 | * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close] |
| 56 | * when the coroutine completes. |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 57 | * |
Roman Elizarov | 66f018c | 2017-09-29 21:39:03 +0300 | [diff] [blame] | 58 | * The [context] for the new coroutine can be explicitly specified. |
| 59 | * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`. |
Roman Elizarov | c7d10a4 | 2018-03-13 18:28:42 +0300 | [diff] [blame] | 60 | * The [coroutineContext] of the parent coroutine may be used, |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 61 | * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine. |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 62 | * The parent job may be also explicitly specified using [parent] parameter. |
| 63 | * |
Roman Elizarov | 66f018c | 2017-09-29 21:39:03 +0300 | [diff] [blame] | 64 | * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used. |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 65 | * |
Roman Elizarov | 7b10c94 | 2017-05-16 21:02:51 +0300 | [diff] [blame] | 66 | * By default, the coroutine is immediately scheduled for execution. |
| 67 | * Other options can be specified via `start` parameter. See [CoroutineStart] for details. |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 68 | * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, |
Roman Elizarov | 48aa20b | 2018-01-29 17:25:28 +0300 | [diff] [blame] | 69 | * it will be started implicitly on the first message |
| 70 | * [sent][SendChannel.send] to this actors's mailbox channel. |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 71 | * |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 72 | * Uncaught exceptions in this coroutine close the channel with this exception as a cause and |
| 73 | * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception. |
| 74 | * |
Roman Elizarov | 89f8ff7 | 2018-03-14 13:39:03 +0300 | [diff] [blame] | 75 | * The kind of the resulting channel depends on the specified [capacity] parameter: |
| 76 | * * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer; |
| 77 | * * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size; |
| 78 | * * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends; |
| 79 | * * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`; |
| 80 | * * otherwise -- throws [IllegalArgumentException]. |
| 81 | * |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 82 | * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. |
| 83 | * |
Roman Elizarov | 48aa20b | 2018-01-29 17:25:28 +0300 | [diff] [blame] | 84 | * ### Using actors |
| 85 | * |
| 86 | * A typical usage of the actor builder looks like this: |
| 87 | * |
| 88 | * ``` |
| 89 | * val c = actor { |
| 90 | * // initialize actor's state |
| 91 | * for (msg in channel) { |
| 92 | * // process message here |
| 93 | * } |
| 94 | * } |
| 95 | * // send messages to the actor |
| 96 | * c.send(...) |
| 97 | * ... |
| 98 | * // stop the actor when it is no longer needed |
| 99 | * c.close() |
| 100 | * ``` |
| 101 | * |
| 102 | * ### Stopping and cancelling actors |
| 103 | * |
| 104 | * When the inbox channel of the actor is [closed][SendChannel.close] it sends a special "close token" to the actor. |
| 105 | * The actor still processes all the messages that were already sent and then "`for (msg in channel)`" loop terminates |
| 106 | * and the actor completes. |
| 107 | * |
| 108 | * If the actor needs to be aborted without processing all the messages that were already sent to it, then |
| 109 | * it shall be created with a parent job: |
| 110 | * |
| 111 | * ``` |
| 112 | * val job = Job() |
| 113 | * val c = actor(parent = job) { ... } |
| 114 | * ... |
| 115 | * // abort the actor |
| 116 | * job.cancel() |
| 117 | * ``` |
| 118 | * |
| 119 | * When actor's parent job is [cancelled][Job.cancel], then actor's job becomes cancelled. It means that |
| 120 | * "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor |
| 121 | * completes without processing remaining messages. |
| 122 | * |
Roman Elizarov | 66f018c | 2017-09-29 21:39:03 +0300 | [diff] [blame] | 123 | * @param context context of the coroutine. The default value is [DefaultDispatcher]. |
| 124 | * @param capacity capacity of the channel's buffer (no buffer by default). |
| 125 | * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 126 | * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).* |
Roman Elizarov | fe8ba6b | 2018-03-13 17:34:29 +0300 | [diff] [blame] | 127 | * @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion]). |
Roman Elizarov | 66f018c | 2017-09-29 21:39:03 +0300 | [diff] [blame] | 128 | * @param block the coroutine code. |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 129 | */ |
| 130 | public fun <E> actor( |
Roman Elizarov | 66f018c | 2017-09-29 21:39:03 +0300 | [diff] [blame] | 131 | context: CoroutineContext = DefaultDispatcher, |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 132 | capacity: Int = 0, |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 133 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 134 | parent: Job? = null, |
Roman Elizarov | fe8ba6b | 2018-03-13 17:34:29 +0300 | [diff] [blame] | 135 | onCompletion: CompletionHandler? = null, |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 136 | block: suspend ActorScope<E>.() -> Unit |
Roman Elizarov | b555d91 | 2017-08-17 21:01:33 +0300 | [diff] [blame] | 137 | ): SendChannel<E> { |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 138 | val newContext = newCoroutineContext(context, parent) |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 139 | val channel = Channel<E>(capacity) |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 140 | val coroutine = if (start.isLazy) |
| 141 | LazyActorCoroutine(newContext, channel, block) else |
| 142 | ActorCoroutine(newContext, channel, active = true) |
Roman Elizarov | fe8ba6b | 2018-03-13 17:34:29 +0300 | [diff] [blame] | 143 | if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) |
Roman Elizarov | 2adf8bc | 2018-01-24 20:09:57 +0300 | [diff] [blame] | 144 | coroutine.start(start, coroutine, block) |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 145 | return coroutine |
| 146 | } |
| 147 | |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 148 | /** @suppress **Deprecated**: Binary compatibility */ |
| 149 | @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) |
| 150 | public fun <E> actor( |
| 151 | context: CoroutineContext = DefaultDispatcher, |
| 152 | capacity: Int = 0, |
| 153 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | fe8ba6b | 2018-03-13 17:34:29 +0300 | [diff] [blame] | 154 | parent: Job? = null, |
| 155 | block: suspend ActorScope<E>.() -> Unit |
| 156 | ): SendChannel<E> = actor(context, capacity, start, parent, block = block) |
| 157 | |
| 158 | /** @suppress **Deprecated**: Binary compatibility */ |
| 159 | @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) |
| 160 | public fun <E> actor( |
| 161 | context: CoroutineContext = DefaultDispatcher, |
| 162 | capacity: Int = 0, |
| 163 | start: CoroutineStart = CoroutineStart.DEFAULT, |
Roman Elizarov | e8f694e | 2017-11-28 10:12:00 +0300 | [diff] [blame] | 164 | block: suspend ActorScope<E>.() -> Unit |
| 165 | ): ActorJob<E> = |
| 166 | actor(context, capacity, start, block = block) as ActorJob<E> |
| 167 | |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 168 | private open class ActorCoroutine<E>( |
| 169 | parentContext: CoroutineContext, |
| 170 | channel: Channel<E>, |
| 171 | active: Boolean |
Roman Elizarov | 9faa61e | 2018-02-22 23:20:28 +0300 | [diff] [blame] | 172 | ) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E>, ActorJob<E> { |
| 173 | override fun onCancellation(cause: Throwable?) { |
Vsevolod Tolstopyatov | fd42442 | 2018-05-23 18:48:37 +0300 | [diff] [blame] | 174 | _channel.cancel(cause) |
| 175 | // Always propagate the exception, don't wait for actor senders |
| 176 | if (cause != null) handleCoroutineException(context, cause) |
Roman Elizarov | 9faa61e | 2018-02-22 23:20:28 +0300 | [diff] [blame] | 177 | } |
| 178 | } |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 179 | |
| 180 | private class LazyActorCoroutine<E>( |
| 181 | parentContext: CoroutineContext, |
| 182 | channel: Channel<E>, |
| 183 | private val block: suspend ActorScope<E>.() -> Unit |
Roman Elizarov | aa461cf | 2018-04-11 13:20:29 +0300 | [diff] [blame] | 184 | ) : ActorCoroutine<E>(parentContext, channel, active = false), |
| 185 | SelectClause2<E, SendChannel<E>> { |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 186 | override fun onStart() { |
Roman Elizarov | a74eb5f | 2017-05-11 20:15:18 +0300 | [diff] [blame] | 187 | block.startCoroutineCancellable(this, this) |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 188 | } |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 189 | |
Vsevolod Tolstopyatov | fd42442 | 2018-05-23 18:48:37 +0300 | [diff] [blame] | 190 | override suspend fun send(element: E) { |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 191 | start() |
| 192 | return super.send(element) |
| 193 | } |
| 194 | |
| 195 | override fun offer(element: E): Boolean { |
| 196 | start() |
| 197 | return super.offer(element) |
| 198 | } |
| 199 | |
Roman Elizarov | db0e22d | 2017-08-29 18:15:57 +0300 | [diff] [blame] | 200 | override val onSend: SelectClause2<E, SendChannel<E>> |
| 201 | get() = this |
| 202 | |
| 203 | // registerSelectSend |
| 204 | override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) { |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 205 | start() |
Roman Elizarov | db0e22d | 2017-08-29 18:15:57 +0300 | [diff] [blame] | 206 | super.onSend.registerSelectClause2(select, param, block) |
Roman Elizarov | ecda27f | 2017-04-06 23:06:26 +0300 | [diff] [blame] | 207 | } |
Roman Elizarov | c0e19f8 | 2017-02-27 11:59:14 +0300 | [diff] [blame] | 208 | } |