blob: f92da1874366e6e1e6cab654db7e55d1b1e21ce8 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov187eace2017-01-31 09:39:58 +030017package kotlinx.coroutines.experimental.channels
18
Roman Elizarov89f8ff72018-03-14 13:39:03 +030019import kotlinx.coroutines.experimental.*
Roman Elizarovf2a710a2017-07-21 18:33:59 +030020import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
21import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
Roman Elizarov89f8ff72018-03-14 13:39:03 +030022import kotlinx.coroutines.experimental.selects.*
Roman Elizarov187eace2017-01-31 09:39:58 +030023
24/**
25 * Sender's interface to [Channel].
26 */
27public interface SendChannel<in E> {
28 /**
29 * Returns `true` if this channel was closed by invocation of [close] and thus
Roman Elizarov95562422017-09-27 17:41:01 +030030 * the [send] and [offer] attempts throws exception.
Roman Elizarov187eace2017-01-31 09:39:58 +030031 */
32 public val isClosedForSend: Boolean
33
34 /**
35 * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
36 * This function returns `false` for [isClosedForSend] channel.
37 */
38 public val isFull: Boolean
39
40 /**
Roman Elizarov1216e912017-02-22 09:57:06 +030041 * Adds [element] into to this channel, suspending the caller while this channel [isFull],
Roman Elizarov95562422017-09-27 17:41:01 +030042 * or throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov187eace2017-01-31 09:39:58 +030043 *
Roman Elizarov4e601322017-03-17 12:46:36 +030044 * Note, that closing a channel _after_ this function had suspended does not cause this suspended send invocation
45 * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
46 * All elements that are sent over the channel are delivered in first-in first-out order. The element that
47 * is being sent will get delivered to receivers before a close token.
48 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030049 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +030050 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +030051 *
52 * *Cancellation of suspended send is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +030053 * throws [CancellationException] it means that the [element] was not sent to this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +030054 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
55 * continue to execute even after it was cancelled from the same thread in the case when this send operation
56 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +030057 *
58 * Note, that this function does not check for cancellation when it is not suspended.
59 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +030060 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030061 * This function can be used in [select] invocation with [onSend] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +030062 * Use [offer] to try sending to this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +030063 */
64 public suspend fun send(element: E)
65
66 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030067 * Clause for [select] expression of [send] suspending function that selects when the element that is specified
68 * as parameter is sent to the channel. When the clause is selected the reference to this channel
69 * is passed into the corresponding block.
70 *
Roman Elizarov95562422017-09-27 17:41:01 +030071 * The [select] invocation fails with exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030072 */
73 public val onSend: SelectClause2<E, SendChannel<E>>
74
75 /**
Roman Elizarov187eace2017-01-31 09:39:58 +030076 * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
77 * and returns `true`. Otherwise, it returns `false` immediately
Roman Elizarov95562422017-09-27 17:41:01 +030078 * or throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov187eace2017-01-31 09:39:58 +030079 */
80 public fun offer(element: E): Boolean
81
82 /**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030083 * Closes this channel with an optional exceptional [cause].
84 * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
Roman Elizarovb555d912017-08-17 21:01:33 +030085 * Conceptually, its sends a special "close token" over this channel.
86 *
87 * Immediately after invocation of this function
Roman Elizarov187eace2017-01-31 09:39:58 +030088 * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
89 * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
90 * are received.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030091 *
Roman Elizarov95562422017-09-27 17:41:01 +030092 * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
93 * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
94 * receive on a failed channel throw the specified [cause] exception.
Roman Elizarov187eace2017-01-31 09:39:58 +030095 */
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030096 public fun close(cause: Throwable? = null): Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030097}
98
99/**
100 * Receiver's interface to [Channel].
101 */
102public interface ReceiveChannel<out E> {
103 /**
104 * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300105 * side and all previously sent items were already received, so that the [receive] attempt
106 * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
107 * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
108 * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
Roman Elizarov187eace2017-01-31 09:39:58 +0300109 */
110 public val isClosedForReceive: Boolean
111
112 /**
113 * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
114 * This function returns `false` for [isClosedForReceive] channel.
115 */
116 public val isEmpty: Boolean
117
118 /**
119 * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
120 * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300121 * If the channel was closed because of the exception, it is called a _failed_ channel and this function
122 * throws the original [close][SendChannel.close] cause exception.
Roman Elizarov187eace2017-01-31 09:39:58 +0300123 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300124 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300125 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300126 *
127 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300128 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300129 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
130 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
131 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300132 *
133 * Note, that this function does not check for cancellation when it is not suspended.
134 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +0300135 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300136 * This function can be used in [select] invocation with [onReceive] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +0300137 * Use [poll] to try receiving from this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +0300138 */
139 public suspend fun receive(): E
140
141 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300142 * Clause for [select] expression of [receive] suspending function that selects with the element that
143 * is received from the channel.
Roman Elizarov95562422017-09-27 17:41:01 +0300144 * The [select] invocation fails with exception if the channel
145 * [isClosedForReceive] (see [close][SendChannel.close] for details).
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300146 */
147 public val onReceive: SelectClause1<E>
148
149 /**
Roman Elizarov187eace2017-01-31 09:39:58 +0300150 * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
Roman Elizarov95562422017-09-27 17:41:01 +0300151 * or returns `null` if the channel is [closed][isClosedForReceive] without cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300152 * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300153 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300154 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300155 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300156 *
157 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300158 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300159 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
160 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
161 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300162 *
163 * Note, that this function does not check for cancellation when it is not suspended.
164 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +0300165 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300166 * This function can be used in [select] invocation with [onReceiveOrNull] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +0300167 * Use [poll] to try receiving from this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +0300168 */
169 public suspend fun receiveOrNull(): E?
170
171 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300172 * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
173 * is received from the channel or selects with `null` if if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300174 * [isClosedForReceive] without cause. The [select] invocation fails with
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300175 * the original [close][SendChannel.close] cause exception if the channel has _failed_.
176 */
177 public val onReceiveOrNull: SelectClause1<E?>
178
179 /**
Roman Elizarov95562422017-09-27 17:41:01 +0300180 * Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
181 * or is [isClosedForReceive] without cause.
182 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300183 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300184 public fun poll(): E?
Roman Elizarov187eace2017-01-31 09:39:58 +0300185
186 /**
187 * Returns new iterator to receive elements from this channels using `for` loop.
Roman Elizarov95562422017-09-27 17:41:01 +0300188 * Iteration completes normally when the channel is [isClosedForReceive] without cause and
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300189 * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300190 */
191 public operator fun iterator(): ChannelIterator<E>
Roman Elizarovb555d912017-08-17 21:01:33 +0300192
193 /**
194 * Cancels reception of remaining elements from this channel. This function closes the channel with
195 * the specified cause (unless it was already closed) and removes all buffered sent elements from it.
196 * This function returns `true` if the channel was not closed previously, or `false` otherwise.
197 *
198 * Immediately after invocation of this function [isClosedForReceive] and
199 * [isClosedForSend][SendChannel.isClosedForSend]
200 * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
201 * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
202 * [ClosedReceiveChannelException] if it was cancelled without a cause.
203 * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
204 * receive on a failed channel throw the specified [cause] exception.
205 */
206 public fun cancel(cause: Throwable? = null): Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +0300207}
208
209/**
210 * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
211 * from concurrent coroutines.
212 */
213public interface ChannelIterator<out E> {
214 /**
215 * Returns `true` if the channel has more elements suspending the caller while this channel
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300216 * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300217 * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300218 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
219 *
Roman Elizarov187eace2017-01-31 09:39:58 +0300220 * This function retrieves and removes the element from this channel for the subsequent invocation
221 * of [next].
222 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300223 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300224 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300225 *
226 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300227 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300228 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
229 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
230 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300231 *
232 * Note, that this function does not check for cancellation when it is not suspended.
233 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
234 */
235 public suspend operator fun hasNext(): Boolean
236
237 /**
238 * Retrieves and removes the element from this channel suspending the caller while this channel
239 * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300240 * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300241 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300242 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300243 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300244 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300245 *
246 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300247 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300248 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
249 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
250 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300251 *
252 * Note, that this function does not check for cancellation when it is not suspended.
253 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
254 */
255 public suspend operator fun next(): E
256}
257
258/**
259 * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
260 * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
261 * but it has suspending operations instead of blocking ones and it can be closed.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300262 *
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300263 * See `Channel(capacity)` factory function for the description of available channel implementations.
Roman Elizarov187eace2017-01-31 09:39:58 +0300264 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300265public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
266 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300267 * Constants for channel factory function `Channel()`.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300268 */
269 public companion object Factory {
270 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300271 * Requests channel with unlimited capacity buffer in `Channel(...)` factory function --
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300272 * the [LinkedListChannel] gets created.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300273 */
274 public const val UNLIMITED = Int.MAX_VALUE
275
276 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300277 * Requests conflated channel in `Channel(...)` factory function --
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300278 * the [ConflatedChannel] gets created.
279 */
280 public const val CONFLATED = -1
281
282 /**
Roman Elizarov8385ec92017-05-11 18:32:52 +0300283 * Creates a channel with the specified buffer capacity (or without a buffer by default).
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300284 * @suppress **Deprecated**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300285 */
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300286 @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
287 public operator fun <E> invoke(capacity: Int = 0): Channel<E> = Channel(capacity)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300288 }
289}
Roman Elizarov187eace2017-01-31 09:39:58 +0300290
291/**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300292 * Creates a channel without a buffer -- [RendezvousChannel].
293 */
294public fun <E> Channel(): Channel<E> = RendezvousChannel<E>()
295
296/**
297 * Creates a channel with the specified buffer capacity (or without a buffer by default).
298 *
299 * The resulting channel type depends on the specified [capacity] parameter:
300 * * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
301 * * when `capacity` is [Channel.UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
302 * * when `capacity` is [Channel.CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
303 * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`;
304 * * otherwise -- throws [IllegalArgumentException].
305 */
306public fun <E> Channel(capacity: Int): Channel<E> =
307 when (capacity) {
308 0 -> RendezvousChannel()
309 UNLIMITED -> LinkedListChannel()
310 CONFLATED -> ConflatedChannel()
311 else -> ArrayChannel(capacity)
312 }
313
314/**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300315 * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
Roman Elizarov95562422017-09-27 17:41:01 +0300316 * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300317 * exception on send attempts.
Roman Elizarov187eace2017-01-31 09:39:58 +0300318 */
Roman Elizarova197d6d2017-02-20 16:11:40 +0300319public class ClosedSendChannelException(message: String?) : CancellationException(message)
Roman Elizarov187eace2017-01-31 09:39:58 +0300320
321/**
322 * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
Roman Elizarov95562422017-09-27 17:41:01 +0300323 * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300324 * exception on receive attempts.
Roman Elizarov187eace2017-01-31 09:39:58 +0300325 */
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300326public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)