blob: f90a36f66a615d4ea945c1e6133088f5fed72ab7 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov187eace2017-01-31 09:39:58 +03005package kotlinx.coroutines.experimental.channels
6
Roman Elizarov89f8ff72018-03-14 13:39:03 +03007import kotlinx.coroutines.experimental.*
Roman Elizarovf2a710a2017-07-21 18:33:59 +03008import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
9import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
Roman Elizarov89f8ff72018-03-14 13:39:03 +030010import kotlinx.coroutines.experimental.selects.*
Roman Elizarov187eace2017-01-31 09:39:58 +030011
12/**
13 * Sender's interface to [Channel].
14 */
15public interface SendChannel<in E> {
16 /**
17 * Returns `true` if this channel was closed by invocation of [close] and thus
Roman Elizarov95562422017-09-27 17:41:01 +030018 * the [send] and [offer] attempts throws exception.
Roman Elizarov187eace2017-01-31 09:39:58 +030019 */
20 public val isClosedForSend: Boolean
21
22 /**
23 * Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
24 * This function returns `false` for [isClosedForSend] channel.
25 */
26 public val isFull: Boolean
27
28 /**
Roman Elizarov1216e912017-02-22 09:57:06 +030029 * Adds [element] into to this channel, suspending the caller while this channel [isFull],
Roman Elizarov95562422017-09-27 17:41:01 +030030 * or throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov187eace2017-01-31 09:39:58 +030031 *
Roman Elizarov4e601322017-03-17 12:46:36 +030032 * Note, that closing a channel _after_ this function had suspended does not cause this suspended send invocation
33 * to abort, because closing a channel is conceptually like sending a special "close token" over this channel.
34 * All elements that are sent over the channel are delivered in first-in first-out order. The element that
35 * is being sent will get delivered to receivers before a close token.
36 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030037 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +030038 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +030039 *
40 * *Cancellation of suspended send is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +030041 * throws [CancellationException] it means that the [element] was not sent to this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +030042 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
43 * continue to execute even after it was cancelled from the same thread in the case when this send operation
44 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +030045 *
46 * Note, that this function does not check for cancellation when it is not suspended.
47 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +030048 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030049 * This function can be used in [select] invocation with [onSend] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +030050 * Use [offer] to try sending to this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +030051 */
52 public suspend fun send(element: E)
53
54 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030055 * Clause for [select] expression of [send] suspending function that selects when the element that is specified
56 * as parameter is sent to the channel. When the clause is selected the reference to this channel
57 * is passed into the corresponding block.
58 *
Roman Elizarov95562422017-09-27 17:41:01 +030059 * The [select] invocation fails with exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030060 */
61 public val onSend: SelectClause2<E, SendChannel<E>>
62
63 /**
Roman Elizarov187eace2017-01-31 09:39:58 +030064 * Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
65 * and returns `true`. Otherwise, it returns `false` immediately
Roman Elizarov95562422017-09-27 17:41:01 +030066 * or throws exception if the channel [isClosedForSend] (see [close] for details).
Roman Elizarov187eace2017-01-31 09:39:58 +030067 */
68 public fun offer(element: E): Boolean
69
70 /**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030071 * Closes this channel with an optional exceptional [cause].
72 * This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
Roman Elizarovb555d912017-08-17 21:01:33 +030073 * Conceptually, its sends a special "close token" over this channel.
74 *
75 * Immediately after invocation of this function
Roman Elizarov187eace2017-01-31 09:39:58 +030076 * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
77 * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
78 * are received.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030079 *
Roman Elizarov95562422017-09-27 17:41:01 +030080 * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive.
81 * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
82 * receive on a failed channel throw the specified [cause] exception.
Roman Elizarov187eace2017-01-31 09:39:58 +030083 */
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030084 public fun close(cause: Throwable? = null): Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +030085}
86
87/**
88 * Receiver's interface to [Channel].
89 */
90public interface ReceiveChannel<out E> {
91 /**
92 * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030093 * side and all previously sent items were already received, so that the [receive] attempt
94 * throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
95 * is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
96 * an element from a failed channel throw the original [close][SendChannel.close] cause exception.
Roman Elizarov187eace2017-01-31 09:39:58 +030097 */
98 public val isClosedForReceive: Boolean
99
100 /**
101 * Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
102 * This function returns `false` for [isClosedForReceive] channel.
103 */
104 public val isEmpty: Boolean
105
106 /**
107 * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
108 * or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300109 * If the channel was closed because of the exception, it is called a _failed_ channel and this function
110 * throws the original [close][SendChannel.close] cause exception.
Roman Elizarov187eace2017-01-31 09:39:58 +0300111 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300112 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300113 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300114 *
115 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300116 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300117 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
118 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
119 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300120 *
121 * Note, that this function does not check for cancellation when it is not suspended.
122 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +0300123 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300124 * This function can be used in [select] invocation with [onReceive] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +0300125 * Use [poll] to try receiving from this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +0300126 */
127 public suspend fun receive(): E
128
129 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300130 * Clause for [select] expression of [receive] suspending function that selects with the element that
131 * is received from the channel.
Roman Elizarov95562422017-09-27 17:41:01 +0300132 * The [select] invocation fails with exception if the channel
133 * [isClosedForReceive] (see [close][SendChannel.close] for details).
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300134 */
135 public val onReceive: SelectClause1<E>
136
137 /**
Roman Elizarov187eace2017-01-31 09:39:58 +0300138 * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
Roman Elizarov95562422017-09-27 17:41:01 +0300139 * or returns `null` if the channel is [closed][isClosedForReceive] without cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300140 * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300141 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300142 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300143 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300144 *
145 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300146 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300147 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
148 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
149 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300150 *
151 * Note, that this function does not check for cancellation when it is not suspended.
152 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov1216e912017-02-22 09:57:06 +0300153 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300154 * This function can be used in [select] invocation with [onReceiveOrNull] clause.
Roman Elizarov1216e912017-02-22 09:57:06 +0300155 * Use [poll] to try receiving from this channel without waiting.
Roman Elizarov187eace2017-01-31 09:39:58 +0300156 */
157 public suspend fun receiveOrNull(): E?
158
159 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300160 * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
161 * is received from the channel or selects with `null` if if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300162 * [isClosedForReceive] without cause. The [select] invocation fails with
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300163 * the original [close][SendChannel.close] cause exception if the channel has _failed_.
164 */
165 public val onReceiveOrNull: SelectClause1<E?>
166
167 /**
Roman Elizarov95562422017-09-27 17:41:01 +0300168 * Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
169 * or is [isClosedForReceive] without cause.
170 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300171 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300172 public fun poll(): E?
Roman Elizarov187eace2017-01-31 09:39:58 +0300173
174 /**
175 * Returns new iterator to receive elements from this channels using `for` loop.
Roman Elizarov95562422017-09-27 17:41:01 +0300176 * Iteration completes normally when the channel is [isClosedForReceive] without cause and
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300177 * throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300178 */
179 public operator fun iterator(): ChannelIterator<E>
Roman Elizarovb555d912017-08-17 21:01:33 +0300180
181 /**
182 * Cancels reception of remaining elements from this channel. This function closes the channel with
183 * the specified cause (unless it was already closed) and removes all buffered sent elements from it.
184 * This function returns `true` if the channel was not closed previously, or `false` otherwise.
185 *
186 * Immediately after invocation of this function [isClosedForReceive] and
187 * [isClosedForSend][SendChannel.isClosedForSend]
188 * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
189 * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
190 * [ClosedReceiveChannelException] if it was cancelled without a cause.
191 * A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
192 * receive on a failed channel throw the specified [cause] exception.
193 */
194 public fun cancel(cause: Throwable? = null): Boolean
Roman Elizarov187eace2017-01-31 09:39:58 +0300195}
196
197/**
198 * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
199 * from concurrent coroutines.
200 */
201public interface ChannelIterator<out E> {
202 /**
203 * Returns `true` if the channel has more elements suspending the caller while this channel
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300204 * [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300205 * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300206 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
207 *
Roman Elizarov187eace2017-01-31 09:39:58 +0300208 * This function retrieves and removes the element from this channel for the subsequent invocation
209 * of [next].
210 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300211 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300212 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300213 *
214 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300215 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300216 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
217 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
218 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300219 *
220 * Note, that this function does not check for cancellation when it is not suspended.
221 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
222 */
223 public suspend operator fun hasNext(): Boolean
224
225 /**
226 * Retrieves and removes the element from this channel suspending the caller while this channel
227 * [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
Roman Elizarov95562422017-09-27 17:41:01 +0300228 * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300229 * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
Roman Elizarov187eace2017-01-31 09:39:58 +0300230 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300231 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarov187eace2017-01-31 09:39:58 +0300232 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300233 *
234 * *Cancellation of suspended receive is atomic* -- when this function
Roman Elizarov187eace2017-01-31 09:39:58 +0300235 * throws [CancellationException] it means that the element was not retrieved from this channel.
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300236 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
237 * continue to execute even after it was cancelled from the same thread in the case when this receive operation
238 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarov187eace2017-01-31 09:39:58 +0300239 *
240 * Note, that this function does not check for cancellation when it is not suspended.
241 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
242 */
243 public suspend operator fun next(): E
244}
245
246/**
247 * Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
248 * Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
249 * but it has suspending operations instead of blocking ones and it can be closed.
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +0300250 *
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300251 * See `Channel(capacity)` factory function for the description of available channel implementations.
Roman Elizarov187eace2017-01-31 09:39:58 +0300252 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300253public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
254 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300255 * Constants for channel factory function `Channel()`.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300256 */
257 public companion object Factory {
258 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300259 * Requests channel with unlimited capacity buffer in `Channel(...)` factory function --
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300260 * the [LinkedListChannel] gets created.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300261 */
262 public const val UNLIMITED = Int.MAX_VALUE
263
264 /**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300265 * Requests conflated channel in `Channel(...)` factory function --
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300266 * the [ConflatedChannel] gets created.
267 */
268 public const val CONFLATED = -1
269
270 /**
Roman Elizarov8385ec92017-05-11 18:32:52 +0300271 * Creates a channel with the specified buffer capacity (or without a buffer by default).
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300272 * @suppress **Deprecated**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300273 */
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300274 @Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
275 public operator fun <E> invoke(capacity: Int = 0): Channel<E> = Channel(capacity)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300276 }
277}
Roman Elizarov187eace2017-01-31 09:39:58 +0300278
279/**
Roman Elizarovf2a710a2017-07-21 18:33:59 +0300280 * Creates a channel without a buffer -- [RendezvousChannel].
281 */
282public fun <E> Channel(): Channel<E> = RendezvousChannel<E>()
283
284/**
285 * Creates a channel with the specified buffer capacity (or without a buffer by default).
286 *
287 * The resulting channel type depends on the specified [capacity] parameter:
288 * * when `capacity` is 0 -- creates [RendezvousChannel] without a buffer;
289 * * when `capacity` is [Channel.UNLIMITED] -- creates [LinkedListChannel] with buffer of unlimited size;
290 * * when `capacity` is [Channel.CONFLATED] -- creates [ConflatedChannel] that conflates back-to-back sends;
291 * * when `capacity` is positive, but less than [UNLIMITED] -- creates [ArrayChannel] with a buffer of the specified `capacity`;
292 * * otherwise -- throws [IllegalArgumentException].
293 */
294public fun <E> Channel(capacity: Int): Channel<E> =
295 when (capacity) {
296 0 -> RendezvousChannel()
297 UNLIMITED -> LinkedListChannel()
298 CONFLATED -> ConflatedChannel()
299 else -> ArrayChannel(capacity)
300 }
301
302/**
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300303 * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
Roman Elizarov95562422017-09-27 17:41:01 +0300304 * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300305 * exception on send attempts.
Roman Elizarov187eace2017-01-31 09:39:58 +0300306 */
Roman Elizarova197d6d2017-02-20 16:11:40 +0300307public class ClosedSendChannelException(message: String?) : CancellationException(message)
Roman Elizarov187eace2017-01-31 09:39:58 +0300308
309/**
310 * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
Roman Elizarov95562422017-09-27 17:41:01 +0300311 * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300312 * exception on receive attempts.
Roman Elizarov187eace2017-01-31 09:39:58 +0300313 */
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300314public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)