blob: 893360a45b3e3dc409328e8c72156226a4e16db4 [file] [log] [blame]
Roman Elizarov86349be2017-03-17 16:47:37 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Vsevolod Tolstopyatov26b86e92018-06-07 15:24:31 +030016@file:JvmMultifileClass
17@file:JvmName("ChannelsKt")
Roman Elizarov86349be2017-03-17 16:47:37 +030018
19package kotlinx.coroutines.experimental.channels
20
Roman Elizarov02e6c812018-01-15 14:59:35 +030021import kotlinx.coroutines.experimental.*
Vsevolod Tolstopyatovdd3b65a2018-06-07 15:51:03 +030022import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarov02e6c812018-01-15 14:59:35 +030023import kotlin.coroutines.experimental.*
Francesco Vascocdc58652017-11-05 17:56:44 +010024
Roman Elizarov4b0ef7b2017-04-17 12:39:29 +030025internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
26
Roman Elizarovb555d912017-08-17 21:01:33 +030027
28// -------- Conversions to ReceiveChannel --------
29
30/**
31 * Returns a channel to read all element of the [Iterable].
32 */
33public fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
34 produce(context) {
35 for (element in this@asReceiveChannel)
36 send(element)
37 }
38
39/**
40 * Returns a channel to read all element of the [Sequence].
41 */
42public fun <E> Sequence<E>.asReceiveChannel(context: CoroutineContext = Unconfined): ReceiveChannel<E> =
43 produce(context) {
44 for (element in this@asReceiveChannel)
45 send(element)
46 }
47
48// -------- Operations on BroadcastChannel --------
49
50/**
51 * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
Marko Devcic1d6230a2018-04-04 20:13:08 +020052 * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
Roman Elizarovb555d912017-08-17 21:01:33 +030053 */
Marko Devcic1d6230a2018-04-04 20:13:08 +020054public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
Roman Elizarovb555d912017-08-17 21:01:33 +030055 val channel = openSubscription()
56 try {
57 return channel.block()
58 } finally {
59 channel.cancel()
60 }
61}
62
63/**
64 * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
65 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +030066public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
Roman Elizarovb555d912017-08-17 21:01:33 +030067 consume {
68 for (element in this) action(element)
69 }
70
71/**
72 * @suppress: **Deprecated**: binary compatibility with old code
73 */
74@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
75public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Unit) =
76 consumeEach { action(it) }
77
78// -------- Operations on ReceiveChannel --------
79
80/**
Roman Elizarov55a66ac2018-03-12 20:15:07 +030081 * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
82 * with the corresponding cause. See also [ReceiveChannel.consume].
83 *
84 * **WARNING**: It is planned that in the future a second invocation of this method
Roman Elizarov89f8ff72018-03-14 13:39:03 +030085 * on an channel that is already being consumed is going to fail fast, that it
86 * immediately throws an [IllegalStateException].
Roman Elizarov55a66ac2018-03-12 20:15:07 +030087 * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
88 * for details.
89 */
90public fun ReceiveChannel<*>.consumes(): CompletionHandler =
91 { cause: Throwable? -> cancel(cause) }
92
93/**
94 * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
95 * specified [ReceiveChannel] instances with the corresponding cause.
96 * See also [ReceiveChannel.consumes()] for a version on one channel.
97 */
98public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
99 { cause: Throwable? ->
100 var exception: Throwable? = null
101 for (channel in channels)
102 try {
103 channel.cancel(cause)
104 } catch (e: Throwable) {
105 if (exception == null) {
106 exception = e
107 } else {
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +0300108 exception.addSuppressedThrowable(e)
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300109 }
110 }
111 exception?.let { throw it }
112 }
113
114/**
Roman Elizarovb555d912017-08-17 21:01:33 +0300115 * Makes sure that the given [block] consumes all elements from the given channel
116 * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300117 *
118 * **WARNING**: It is planned that in the future a second invocation of this method
119 * on an channel that is already being consumed is going to fail fast, that is
120 * immediately throw an [IllegalStateException].
121 * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
122 * for details.
123 *
124 * The operation is _terminal_.
Roman Elizarovb555d912017-08-17 21:01:33 +0300125 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300126public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
127 var cause: Throwable? = null
Roman Elizarovb555d912017-08-17 21:01:33 +0300128 try {
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300129 return block()
130 } catch (e: Throwable) {
131 cause = e
132 throw e
Roman Elizarovb555d912017-08-17 21:01:33 +0300133 } finally {
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300134 cancel(cause)
Roman Elizarovb555d912017-08-17 21:01:33 +0300135 }
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300136}
Roman Elizarovb555d912017-08-17 21:01:33 +0300137
138/**
139 * Performs the given [action] for each received element.
140 *
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300141 * **WARNING**: It is planned that in the future a second invocation of this method
142 * on an channel that is already being consumed is going to fail fast, that is
143 * immediately throw an [IllegalStateException].
144 * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
145 * for details.
146 *
147 * The operation is _terminal_.
148 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300149 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300150public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
Roman Elizarovb555d912017-08-17 21:01:33 +0300151 consume {
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300152 for (e in this) action(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300153 }
154
155/**
156 * @suppress: **Deprecated**: binary compatibility with old code
157 */
158@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
159public suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit) =
160 consumeEach { action(it) }
161
162/**
163 * Performs the given [action] for each received element.
164 *
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300165 * The operation is _terminal_.
166 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300167 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300168public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
Roman Elizarovb555d912017-08-17 21:01:33 +0300169 var index = 0
170 consumeEach {
171 action(IndexedValue(index++, it))
172 }
173}
174
175/**
176 * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
177 *
178 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300179 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300180 */
181public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
182 elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
183
184/**
185 * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
186 *
187 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300188 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300189 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300190public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
Roman Elizarovb555d912017-08-17 21:01:33 +0300191 consume {
192 if (index < 0)
193 return defaultValue(index)
194 var count = 0
195 for (element in this) {
196 if (index == count++)
197 return element
198 }
199 return defaultValue(index)
200 }
201
202/**
203 * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
204 *
205 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300206 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300207 */
208public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
209 consume {
210 if (index < 0)
211 return null
212 var count = 0
213 for (element in this) {
214 if (index == count++)
215 return element
216 }
217 return null
218 }
219
220/**
221 * Returns the first element matching the given [predicate], or `null` if no such element was found.
222 *
223 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300224 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300225 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300226public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
Roman Elizarovb555d912017-08-17 21:01:33 +0300227 firstOrNull(predicate)
228
229/**
230 * Returns the last element matching the given [predicate], or `null` if no such element was found.
231 *
232 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300233 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300234 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300235public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
Roman Elizarovb555d912017-08-17 21:01:33 +0300236 lastOrNull(predicate)
237
238/**
239 * Returns first element.
240 * @throws [NoSuchElementException] if the channel is empty.
241 *
242 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300243 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300244 */
245public suspend fun <E> ReceiveChannel<E>.first(): E =
246 consume {
247 val iterator = iterator()
248 if (!iterator.hasNext())
249 throw NoSuchElementException("ReceiveChannel is empty.")
250 return iterator.next()
251 }
252
253/**
254 * Returns the first element matching the given [predicate].
255 * @throws [NoSuchElementException] if no such element is found.
256 *
257 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300258 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300259 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300260public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
Roman Elizarovb555d912017-08-17 21:01:33 +0300261 consumeEach {
262 if (predicate(it)) return it
263 }
264 throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
265}
266
267/**
268 * Returns the first element, or `null` if the channel is empty.
269 *
270 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300271 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300272 */
273public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
274 consume {
275 val iterator = iterator()
276 if (!iterator.hasNext())
277 return null
278 return iterator.next()
279 }
280
281/**
282 * Returns the first element matching the given [predicate], or `null` if element was not found.
283 *
284 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300285 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300286 */
287public inline suspend fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
288 consumeEach {
289 if (predicate(it)) return it
290 }
291 return null
292}
293
294/**
295 * Returns first index of [element], or -1 if the channel does not contain element.
296 *
297 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300298 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300299 */
300public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
301 var index = 0
302 consumeEach {
303 if (element == it)
304 return index
305 index++
306 }
307 return -1
308}
309
310/**
311 * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
312 *
313 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300314 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300315 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300316public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
Roman Elizarovb555d912017-08-17 21:01:33 +0300317 var index = 0
318 consumeEach {
319 if (predicate(it))
320 return index
321 index++
322 }
323 return -1
324}
325
326/**
327 * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
328 *
329 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300330 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300331 */
332public inline suspend fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
333 var lastIndex = -1
334 var index = 0
335 consumeEach {
336 if (predicate(it))
337 lastIndex = index
338 index++
339 }
340 return lastIndex
341}
342
343/**
344 * Returns the last element.
345 * @throws [NoSuchElementException] if the channel is empty.
346 *
347 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300348 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300349 */
350public suspend fun <E> ReceiveChannel<E>.last(): E =
351 consume {
352 val iterator = iterator()
353 if (!iterator.hasNext())
354 throw NoSuchElementException("ReceiveChannel is empty.")
355 var last = iterator.next()
356 while (iterator.hasNext())
357 last = iterator.next()
358 return last
359 }
360
361/**
362 * Returns the last element matching the given [predicate].
363 * @throws [NoSuchElementException] if no such element is found.
364 *
365 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300366 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300367 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300368public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
Roman Elizarovb555d912017-08-17 21:01:33 +0300369 var last: E? = null
370 var found = false
371 consumeEach {
372 if (predicate(it)) {
373 last = it
374 found = true
375 }
376 }
377 if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
378 @Suppress("UNCHECKED_CAST")
379 return last as E
380}
381
382/**
383 * Returns last index of [element], or -1 if the channel does not contain element.
384 *
385 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300386 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300387 */
388public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
389 var lastIndex = -1
390 var index = 0
391 consumeEach {
392 if (element == it)
393 lastIndex = index
394 index++
395 }
396 return lastIndex
397}
398
399/**
400 * Returns the last element, or `null` if the channel is empty.
401 *
402 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300403 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300404 */
405public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
406 consume {
407 val iterator = iterator()
408 if (!iterator.hasNext())
409 return null
410 var last = iterator.next()
411 while (iterator.hasNext())
412 last = iterator.next()
413 return last
414 }
415
416/**
417 * Returns the last element matching the given [predicate], or `null` if no such element was found.
418 *
419 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300420 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300421 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300422public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
Roman Elizarovb555d912017-08-17 21:01:33 +0300423 var last: E? = null
424 consumeEach {
425 if (predicate(it)) {
426 last = it
427 }
428 }
429 return last
430}
431
432/**
433 * Returns the single element, or throws an exception if the channel is empty or has more than one element.
434 *
435 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300436 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300437 */
438public suspend fun <E> ReceiveChannel<E>.single(): E =
439 consume {
440 val iterator = iterator()
441 if (!iterator.hasNext())
442 throw NoSuchElementException("ReceiveChannel is empty.")
443 val single = iterator.next()
444 if (iterator.hasNext())
445 throw IllegalArgumentException("ReceiveChannel has more than one element.")
446 return single
447 }
448
449/**
450 * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
451 *
452 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300453 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300454 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300455public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
Roman Elizarovb555d912017-08-17 21:01:33 +0300456 var single: E? = null
457 var found = false
458 consumeEach {
459 if (predicate(it)) {
460 if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
461 single = it
462 found = true
463 }
464 }
465 if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
466 @Suppress("UNCHECKED_CAST")
467 return single as E
468}
469
470/**
471 * Returns single element, or `null` if the channel is empty or has more than one element.
472 *
473 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300474 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300475 */
476public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
477 consume {
478 val iterator = iterator()
479 if (!iterator.hasNext())
480 return null
481 val single = iterator.next()
482 if (iterator.hasNext())
483 return null
484 return single
485 }
486
487/**
488 * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
489 *
490 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300491 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300492 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300493public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
Roman Elizarovb555d912017-08-17 21:01:33 +0300494 var single: E? = null
495 var found = false
496 consumeEach {
497 if (predicate(it)) {
498 if (found) return null
499 single = it
500 found = true
501 }
502 }
503 if (!found) return null
504 return single
505}
506
507/**
508 * Returns a channel containing all elements except first [n] elements.
509 *
510 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300511 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300512 */
513public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300514 produce(context, onCompletion = consumes()) {
515 require(n >= 0) { "Requested element count $n is less than zero." }
516 var remaining: Int = n
517 if (remaining > 0)
518 for (e in this@drop) {
519 remaining--
520 if (remaining == 0)
521 break
Roman Elizarovb555d912017-08-17 21:01:33 +0300522 }
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300523 for (e in this@drop) {
524 send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300525 }
526 }
527
528/**
529 * Returns a channel containing all elements except first elements that satisfy the given [predicate].
530 *
531 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300532 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300533 */
534// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
535public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300536 produce(context, onCompletion = consumes()) {
537 for (e in this@dropWhile) {
538 if (!predicate(e)) {
539 send(e)
540 break
Roman Elizarovb555d912017-08-17 21:01:33 +0300541 }
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300542 }
543 for (e in this@dropWhile) {
544 send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300545 }
546 }
547
548/**
549 * Returns a channel containing only elements matching the given [predicate].
550 *
551 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300552 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300553 */
554// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
555public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300556 produce(context, onCompletion = consumes()) {
557 for (e in this@filter) {
558 if (predicate(e)) send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300559 }
560 }
561
562/**
563 * Returns a channel containing only elements matching the given [predicate].
564 * @param [predicate] function that takes the index of an element and the element itself
565 * and returns the result of predicate evaluation on the element.
566 *
567 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300568 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300569 */
570// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
571public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300572 produce(context, onCompletion = consumes()) {
Roman Elizarovb555d912017-08-17 21:01:33 +0300573 var index = 0
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300574 for (e in this@filterIndexed) {
575 if (predicate(index++, e)) send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300576 }
577 }
578
579/**
580 * Appends all elements matching the given [predicate] to the given [destination].
581 * @param [predicate] function that takes the index of an element and the element itself
582 * and returns the result of predicate evaluation on the element.
583 *
584 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300585 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300586 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300587public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300588 consumeEachIndexed { (index, element) ->
589 if (predicate(index, element)) destination.add(element)
590 }
591 return destination
592}
593
594/**
595 * Appends all elements matching the given [predicate] to the given [destination].
596 * @param [predicate] function that takes the index of an element and the element itself
597 * and returns the result of predicate evaluation on the element.
598 *
599 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300600 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300601 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300602public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300603 consumeEachIndexed { (index, element) ->
604 if (predicate(index, element)) destination.send(element)
605 }
606 return destination
607}
608
609/**
610 * Returns a channel containing all elements not matching the given [predicate].
611 *
612 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300613 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300614 */
615// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
Jonathan Cornazfcce0382018-03-05 09:12:09 +0100616public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
617 filter(context) { !predicate(it) }
618
619/**
Jonathan Cornazfcce0382018-03-05 09:12:09 +0100620 * @suppress **Deprecated**: For binary compatibility only
621 */
622@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
623public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> = filterNot(predicate = predicate)
Roman Elizarovb555d912017-08-17 21:01:33 +0300624
625/**
626 * Returns a channel containing all elements that are not `null`.
627 *
628 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300629 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300630 */
631@Suppress("UNCHECKED_CAST")
632public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
633 filter { it != null } as ReceiveChannel<E>
634
635/**
636 * Appends all elements that are not `null` to the given [destination].
637 *
638 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300639 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300640 */
641public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
642 consumeEach {
643 if (it != null) destination.add(it)
644 }
645 return destination
646}
647
648/**
649 * Appends all elements that are not `null` to the given [destination].
650 *
651 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300652 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300653 */
654public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
655 consumeEach {
656 if (it != null) destination.send(it)
657 }
658 return destination
659}
660
661/**
662 * Appends all elements not matching the given [predicate] to the given [destination].
663 *
664 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300665 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300666 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300667public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300668 consumeEach {
669 if (!predicate(it)) destination.add(it)
670 }
671 return destination
672}
673
674/**
675 * Appends all elements not matching the given [predicate] to the given [destination].
676 *
677 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300678 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300679 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300680public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300681 consumeEach {
682 if (!predicate(it)) destination.send(it)
683 }
684 return destination
685}
686
687/**
688 * Appends all elements matching the given [predicate] to the given [destination].
689 *
690 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300691 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300692 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300693public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300694 consumeEach {
695 if (predicate(it)) destination.add(it)
696 }
697 return destination
698}
699
700/**
701 * Appends all elements matching the given [predicate] to the given [destination].
702 *
703 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300704 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300705 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300706public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
Roman Elizarovb555d912017-08-17 21:01:33 +0300707 consumeEach {
708 if (predicate(it)) destination.send(it)
709 }
710 return destination
711}
712
713/**
714 * Returns a channel containing first [n] elements.
715 *
716 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300717 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300718 */
719public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Unconfined): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300720 produce(context, onCompletion = consumes()) {
721 if (n == 0) return@produce
722 require(n >= 0) { "Requested element count $n is less than zero." }
723 var remaining: Int = n
724 for (e in this@take) {
725 send(e)
726 remaining--
727 if (remaining == 0)
728 return@produce
Roman Elizarovb555d912017-08-17 21:01:33 +0300729 }
730 }
731
732/**
733 * Returns a channel containing first elements satisfying the given [predicate].
734 *
735 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300736 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300737 */
738// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
739public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300740 produce(context, onCompletion = consumes()) {
741 for (e in this@takeWhile) {
742 if (!predicate(e)) return@produce
743 send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +0300744 }
745 }
746
747/**
748 * Returns a [Map] containing key-value pairs provided by [transform] function
749 * applied to elements of the given channel.
750 *
751 * If any of two pairs would have the same key the last one gets added to the map.
752 *
753 * The returned map preserves the entry iteration order of the original channel.
754 *
755 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300756 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300757 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300758public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
Roman Elizarovb555d912017-08-17 21:01:33 +0300759 associateTo(LinkedHashMap(), transform)
760
761/**
762 * Returns a [Map] containing the elements from the given channel indexed by the key
763 * returned from [keySelector] function applied to each element.
764 *
765 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
766 *
767 * The returned map preserves the entry iteration order of the original channel.
768 *
769 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300770 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300771 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300772public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
Roman Elizarovb555d912017-08-17 21:01:33 +0300773 associateByTo(LinkedHashMap(), keySelector)
774
775/**
776 * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
777 *
778 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
779 *
780 * The returned map preserves the entry iteration order of the original channel.
781 *
782 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300783 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300784 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300785public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
Roman Elizarovb555d912017-08-17 21:01:33 +0300786 associateByTo(LinkedHashMap(), keySelector, valueTransform)
787
788/**
789 * Populates and returns the [destination] mutable map with key-value pairs,
790 * where key is provided by the [keySelector] function applied to each element of the given channel
791 * and value is the element itself.
792 *
793 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
794 *
795 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300796 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300797 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300798public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
Roman Elizarovb555d912017-08-17 21:01:33 +0300799 consumeEach {
800 destination.put(keySelector(it), it)
801 }
802 return destination
803}
804
805/**
806 * Populates and returns the [destination] mutable map with key-value pairs,
807 * where key is provided by the [keySelector] function and
808 * and value is provided by the [valueTransform] function applied to elements of the given channel.
809 *
810 * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
811 *
812 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300813 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300814 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300815public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
Roman Elizarovb555d912017-08-17 21:01:33 +0300816 consumeEach {
817 destination.put(keySelector(it), valueTransform(it))
818 }
819 return destination
820}
821
822/**
823 * Populates and returns the [destination] mutable map with key-value pairs
824 * provided by [transform] function applied to each element of the given channel.
825 *
826 * If any of two pairs would have the same key the last one gets added to the map.
827 *
828 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300829 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300830 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300831public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
Roman Elizarovb555d912017-08-17 21:01:33 +0300832 consumeEach {
833 destination += transform(it)
834 }
835 return destination
836}
837
838/**
839 * Send each element of the original channel
840 * and appends the results to the given [destination].
841 *
842 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300843 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300844 */
845public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
846 consumeEach {
847 destination.send(it)
848 }
849 return destination
850}
851
852/**
853 * Appends all elements to the given [destination] collection.
854 *
855 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300856 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300857 */
858public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
859 consumeEach {
860 destination.add(it)
861 }
862 return destination
863}
864
865/**
866 * Returns a [List] containing all elements.
867 *
868 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300869 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300870 */
871public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
872 this.toMutableList()
873
874/**
875 * Returns a [Map] filled with all elements of this channel.
876 *
877 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300878 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300879 */
880public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
881 toMap(LinkedHashMap())
882
883/**
884 * Returns a [MutableMap] filled with all elements of this channel.
885 *
886 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300887 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300888 */
889public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
890 consumeEach {
891 destination += it
892 }
893 return destination
894}
895
896/**
897 * Returns a [MutableList] filled with all elements of this channel.
898 *
899 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300900 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300901 */
902public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
903 toCollection(ArrayList())
904
905/**
906 * Returns a [Set] of all elements.
907 *
908 * The returned set preserves the element iteration order of the original channel.
909 *
910 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300911 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300912 */
913public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300914 this.toMutableSet()
Roman Elizarovb555d912017-08-17 21:01:33 +0300915
916/**
917 * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
918 *
919 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300920 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300921 */
922// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
923public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300924 produce(context, onCompletion = consumes()) {
925 for (e in this@flatMap) {
926 transform(e).toChannel(this)
Roman Elizarovb555d912017-08-17 21:01:33 +0300927 }
928 }
929
930/**
931 * Groups elements of the original channel by the key returned by the given [keySelector] function
932 * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
933 *
934 * The returned map preserves the entry iteration order of the keys produced from the original channel.
935 *
936 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300937 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300938 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300939public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
Roman Elizarovb555d912017-08-17 21:01:33 +0300940 groupByTo(LinkedHashMap(), keySelector)
941
942/**
943 * Groups values returned by the [valueTransform] function applied to each element of the original channel
944 * by the key returned by the given [keySelector] function applied to the element
945 * and returns a map where each group key is associated with a list of corresponding values.
946 *
947 * The returned map preserves the entry iteration order of the keys produced from the original channel.
948 *
949 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300950 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300951 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300952public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
Roman Elizarovb555d912017-08-17 21:01:33 +0300953 groupByTo(LinkedHashMap(), keySelector, valueTransform)
954
955/**
956 * Groups elements of the original channel by the key returned by the given [keySelector] function
957 * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
958 *
959 * @return The [destination] map.
960 *
961 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300962 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300963 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300964public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
Roman Elizarovb555d912017-08-17 21:01:33 +0300965 consumeEach {
966 val key = keySelector(it)
967 val list = destination.getOrPut(key) { ArrayList() }
968 list.add(it)
969 }
970 return destination
971}
972
973/**
974 * Groups values returned by the [valueTransform] function applied to each element of the original channel
975 * by the key returned by the given [keySelector] function applied to the element
976 * and puts to the [destination] map each group key associated with a list of corresponding values.
977 *
978 * @return The [destination] map.
979 *
980 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300981 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300982 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300983public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
Roman Elizarovb555d912017-08-17 21:01:33 +0300984 consumeEach {
985 val key = keySelector(it)
986 val list = destination.getOrPut(key) { ArrayList() }
987 list.add(valueTransform(it))
988 }
989 return destination
990}
991
992/**
993 * Returns a channel containing the results of applying the given [transform] function
994 * to each element in the original channel.
995 *
996 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300997 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +0300998 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +0300999// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
Roman Elizarovb555d912017-08-17 21:01:33 +03001000public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001001 produce(context, onCompletion = consumes()) {
Roman Elizarovb555d912017-08-17 21:01:33 +03001002 consumeEach {
1003 send(transform(it))
1004 }
1005 }
1006
1007/**
1008 * Returns a channel containing the results of applying the given [transform] function
1009 * to each element and its index in the original channel.
1010 * @param [transform] function that takes the index of an element and the element itself
1011 * and returns the result of the transform applied to the element.
1012 *
1013 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001014 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001015 */
1016// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
1017public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001018 produce(context, onCompletion = consumes()) {
Roman Elizarovb555d912017-08-17 21:01:33 +03001019 var index = 0
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001020 for (e in this@mapIndexed) {
1021 send(transform(index++, e))
Roman Elizarovb555d912017-08-17 21:01:33 +03001022 }
1023 }
1024
1025/**
1026 * Returns a channel containing only the non-null results of applying the given [transform] function
1027 * to each element and its index in the original channel.
1028 * @param [transform] function that takes the index of an element and the element itself
1029 * and returns the result of the transform applied to the element.
1030 *
1031 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001032 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001033 */
1034// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
1035public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
1036 mapIndexed(context, transform).filterNotNull()
1037
1038/**
1039 * Applies the given [transform] function to each element and its index in the original channel
1040 * and appends only the non-null results to the given [destination].
1041 * @param [transform] function that takes the index of an element and the element itself
1042 * and returns the result of the transform applied to the element.
1043 *
1044 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001045 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001046 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001047public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001048 consumeEachIndexed { (index, element) ->
1049 transform(index, element)?.let { destination.add(it) }
1050 }
1051 return destination
1052}
1053
1054/**
1055 * Applies the given [transform] function to each element and its index in the original channel
1056 * and appends only the non-null results to the given [destination].
1057 * @param [transform] function that takes the index of an element and the element itself
1058 * and returns the result of the transform applied to the element.
1059 *
1060 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001061 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001062 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001063public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001064 consumeEachIndexed { (index, element) ->
1065 transform(index, element)?.let { destination.send(it) }
1066 }
1067 return destination
1068}
1069
1070/**
1071 * Applies the given [transform] function to each element and its index in the original channel
1072 * and appends the results to the given [destination].
1073 * @param [transform] function that takes the index of an element and the element itself
1074 * and returns the result of the transform applied to the element.
1075 *
1076 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001077 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001078 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001079public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001080 var index = 0
1081 consumeEach {
1082 destination.add(transform(index++, it))
1083 }
1084 return destination
1085}
1086
1087/**
1088 * Applies the given [transform] function to each element and its index in the original channel
1089 * and appends the results to the given [destination].
1090 * @param [transform] function that takes the index of an element and the element itself
1091 * and returns the result of the transform applied to the element.
1092 *
1093 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001094 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001095 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001096public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001097 var index = 0
1098 consumeEach {
1099 destination.send(transform(index++, it))
1100 }
1101 return destination
1102}
1103
1104/**
1105 * Returns a channel containing only the non-null results of applying the given [transform] function
1106 * to each element in the original channel.
1107 *
1108 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001109 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001110 */
1111// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
1112public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
1113 map(context, transform).filterNotNull()
1114
1115/**
1116 * Applies the given [transform] function to each element in the original channel
1117 * and appends only the non-null results to the given [destination].
1118 *
1119 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001120 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001121 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001122public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001123 consumeEach {
1124 transform(it)?.let { destination.add(it) }
1125 }
1126 return destination
1127}
1128
1129/**
1130 * Applies the given [transform] function to each element in the original channel
1131 * and appends only the non-null results to the given [destination].
1132 *
1133 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001134 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001135 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001136public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001137 consumeEach {
1138 transform(it)?.let { destination.send(it) }
1139 }
1140 return destination
1141}
1142
1143/**
1144 * Applies the given [transform] function to each element of the original channel
1145 * and appends the results to the given [destination].
1146 *
1147 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001148 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001149 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001150public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001151 consumeEach {
1152 destination.add(transform(it))
1153 }
1154 return destination
1155}
1156
1157/**
1158 * Applies the given [transform] function to each element of the original channel
1159 * and appends the results to the given [destination].
1160 *
1161 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001162 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001163 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001164public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
Roman Elizarovb555d912017-08-17 21:01:33 +03001165 consumeEach {
1166 destination.send(transform(it))
1167 }
1168 return destination
1169}
1170
1171/**
1172 * Returns a channel of [IndexedValue] for each element of the original channel.
1173 *
1174 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001175 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001176 */
1177public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Unconfined): ReceiveChannel<IndexedValue<E>> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001178 produce(context, onCompletion = consumes()) {
Roman Elizarovb555d912017-08-17 21:01:33 +03001179 var index = 0
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001180 for (e in this@withIndex) {
1181 send(IndexedValue(index++, e))
Roman Elizarovb555d912017-08-17 21:01:33 +03001182 }
1183 }
1184
1185/**
1186 * Returns a channel containing only distinct elements from the given channel.
1187 *
1188 * The elements in the resulting channel are in the same order as they were in the source channel.
1189 *
1190 * The operation is _intermediate_ and _stateful_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001191 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001192 */
1193public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
1194 this.distinctBy { it }
1195
1196/**
1197 * Returns a channel containing only elements from the given channel
1198 * having distinct keys returned by the given [selector] function.
1199 *
1200 * The elements in the resulting channel are in the same order as they were in the source channel.
1201 *
1202 * The operation is _intermediate_ and _stateful_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001203 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001204 */
1205// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
1206public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001207 produce(context, onCompletion = consumes()) {
Roman Elizarovb555d912017-08-17 21:01:33 +03001208 val keys = HashSet<K>()
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001209 for (e in this@distinctBy) {
1210 val k = selector(e)
Roman Elizarovb555d912017-08-17 21:01:33 +03001211 if (k !in keys) {
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001212 send(e)
Roman Elizarovb555d912017-08-17 21:01:33 +03001213 keys += k
1214 }
1215 }
1216 }
1217
1218/**
1219 * Returns a mutable set containing all distinct elements from the given channel.
1220 *
1221 * The returned set preserves the element iteration order of the original channel.
1222 *
1223 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001224 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001225 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001226public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
1227 toCollection(LinkedHashSet())
Roman Elizarovb555d912017-08-17 21:01:33 +03001228
1229/**
1230 * Returns `true` if all elements match the given [predicate].
1231 *
1232 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001233 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001234 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001235public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
Roman Elizarovb555d912017-08-17 21:01:33 +03001236 consumeEach {
1237 if (!predicate(it)) return false
1238 }
1239 return true
1240}
1241
1242/**
1243 * Returns `true` if channel has at least one element.
1244 *
1245 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001246 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001247 */
1248public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
1249 consume {
1250 return iterator().hasNext()
1251 }
1252
1253/**
1254 * Returns `true` if at least one element matches the given [predicate].
1255 *
1256 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001257 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001258 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001259public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
Roman Elizarovb555d912017-08-17 21:01:33 +03001260 consumeEach {
1261 if (predicate(it)) return true
1262 }
1263 return false
1264}
1265
1266/**
1267 * Returns the number of elements in this channel.
1268 *
1269 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001270 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001271 */
1272public suspend fun <E> ReceiveChannel<E>.count(): Int {
1273 var count = 0
1274 consumeEach { count++ }
1275 return count
1276}
1277
1278/**
1279 * Returns the number of elements matching the given [predicate].
1280 *
1281 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001282 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001283 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001284public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
Roman Elizarovb555d912017-08-17 21:01:33 +03001285 var count = 0
1286 consumeEach {
1287 if (predicate(it)) count++
1288 }
1289 return count
1290}
1291
1292/**
1293 * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
1294 *
1295 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001296 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001297 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001298public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
Roman Elizarovb555d912017-08-17 21:01:33 +03001299 var accumulator = initial
1300 consumeEach {
1301 accumulator = operation(accumulator, it)
1302 }
1303 return accumulator
1304}
1305
1306/**
1307 * Accumulates value starting with [initial] value and applying [operation] from left to right
1308 * to current accumulator value and each element with its index in the original channel.
1309 * @param [operation] function that takes the index of an element, current accumulator value
1310 * and the element itself, and calculates the next accumulator value.
1311 *
1312 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001313 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001314 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001315public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
Roman Elizarovb555d912017-08-17 21:01:33 +03001316 var index = 0
1317 var accumulator = initial
1318 consumeEach {
1319 accumulator = operation(index++, accumulator, it)
1320 }
1321 return accumulator
1322}
1323
1324/**
1325 * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
1326 *
1327 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001328 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001329 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001330public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
Roman Elizarovb555d912017-08-17 21:01:33 +03001331 consume {
1332 val iterator = iterator()
1333 if (!iterator.hasNext()) return null
1334 var maxElem = iterator.next()
1335 var maxValue = selector(maxElem)
1336 while (iterator.hasNext()) {
1337 val e = iterator.next()
1338 val v = selector(e)
1339 if (maxValue < v) {
1340 maxElem = e
1341 maxValue = v
1342 }
1343 }
1344 return maxElem
1345 }
1346
1347/**
1348 * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
1349 *
1350 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001351 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001352 */
1353public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
1354 consume {
1355 val iterator = iterator()
1356 if (!iterator.hasNext()) return null
1357 var max = iterator.next()
1358 while (iterator.hasNext()) {
1359 val e = iterator.next()
1360 if (comparator.compare(max, e) < 0) max = e
1361 }
1362 return max
1363 }
1364
1365/**
1366 * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
1367 *
1368 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001369 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001370 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001371public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
Roman Elizarovb555d912017-08-17 21:01:33 +03001372 consume {
1373 val iterator = iterator()
1374 if (!iterator.hasNext()) return null
1375 var minElem = iterator.next()
1376 var minValue = selector(minElem)
1377 while (iterator.hasNext()) {
1378 val e = iterator.next()
1379 val v = selector(e)
1380 if (minValue > v) {
1381 minElem = e
1382 minValue = v
1383 }
1384 }
1385 return minElem
1386 }
1387
1388/**
1389 * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
1390 *
1391 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001392 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001393 */
1394public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
1395 consume {
1396 val iterator = iterator()
1397 if (!iterator.hasNext()) return null
1398 var min = iterator.next()
1399 while (iterator.hasNext()) {
1400 val e = iterator.next()
1401 if (comparator.compare(min, e) > 0) min = e
1402 }
1403 return min
1404 }
1405
1406/**
1407 * Returns `true` if the channel has no elements.
1408 *
1409 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001410 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001411 */
1412public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
1413 consume {
1414 return !iterator().hasNext()
1415 }
1416
1417/**
1418 * Returns `true` if no elements match the given [predicate].
1419 *
1420 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001421 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001422 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001423public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
Roman Elizarovb555d912017-08-17 21:01:33 +03001424 consumeEach {
1425 if (predicate(it)) return false
1426 }
1427 return true
1428}
1429
1430/**
1431 * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
1432 *
1433 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001434 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001435 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001436public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
Roman Elizarovb555d912017-08-17 21:01:33 +03001437 consume {
1438 val iterator = this.iterator()
1439 if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
1440 var accumulator: S = iterator.next()
1441 while (iterator.hasNext()) {
1442 accumulator = operation(accumulator, iterator.next())
1443 }
1444 return accumulator
1445 }
1446
1447/**
1448 * Accumulates value starting with the first element and applying [operation] from left to right
1449 * to current accumulator value and each element with its index in the original channel.
1450 * @param [operation] function that takes the index of an element, current accumulator value
1451 * and the element itself and calculates the next accumulator value.
1452 *
1453 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001454 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001455 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001456// todo: mark operation with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
1457public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
Roman Elizarovb555d912017-08-17 21:01:33 +03001458 consume {
1459 val iterator = this.iterator()
1460 if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
1461 var index = 1
1462 var accumulator: S = iterator.next()
1463 while (iterator.hasNext()) {
1464 accumulator = operation(index++, accumulator, iterator.next())
1465 }
1466 return accumulator
1467 }
1468
1469/**
1470 * Returns the sum of all values produced by [selector] function applied to each element in the channel.
1471 *
1472 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001473 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001474 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001475public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
Roman Elizarovb555d912017-08-17 21:01:33 +03001476 var sum = 0
1477 consumeEach {
1478 sum += selector(it)
1479 }
1480 return sum
1481}
1482
1483/**
1484 * Returns the sum of all values produced by [selector] function applied to each element in the channel.
1485 *
1486 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001487 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001488 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001489public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
Roman Elizarovb555d912017-08-17 21:01:33 +03001490 var sum = 0.0
1491 consumeEach {
1492 sum += selector(it)
1493 }
1494 return sum
1495}
1496
1497/**
1498 * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
1499 *
1500 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001501 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001502 */
1503public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
1504 map { it ?: throw IllegalArgumentException("null element found in $this.") }
1505
1506/**
1507 * Splits the original channel into pair of lists,
1508 * where *first* list contains elements for which [predicate] yielded `true`,
1509 * while *second* list contains elements for which [predicate] yielded `false`.
1510 *
1511 * The operation is _terminal_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001512 * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
Roman Elizarovb555d912017-08-17 21:01:33 +03001513 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001514public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
Roman Elizarovb555d912017-08-17 21:01:33 +03001515 val first = ArrayList<E>()
1516 val second = ArrayList<E>()
1517 consumeEach {
1518 if (predicate(it)) {
1519 first.add(it)
1520 } else {
1521 second.add(it)
1522 }
1523 }
1524 return Pair(first, second)
1525}
1526
1527/**
1528 * Returns a channel of pairs built from elements of both channels with same indexes.
1529 * Resulting channel has length of shortest input channel.
1530 *
1531 * The operation is _intermediate_ and _stateless_.
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001532 * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
Roman Elizarovb555d912017-08-17 21:01:33 +03001533 */
1534public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
1535 zip(other) { t1, t2 -> t1 to t2 }
1536
1537/**
1538 * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
1539 *
1540 * The operation is _intermediate_ and _stateless_.
1541 * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
1542 */
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001543// todo: mark transform with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
Roman Elizarovb555d912017-08-17 21:01:33 +03001544public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001545 produce(context, onCompletion = consumesAll(this, other)) {
1546 val otherIterator = other.iterator()
1547 this@zip.consumeEach { element1 ->
1548 if (!otherIterator.hasNext()) return@consumeEach
1549 val element2 = otherIterator.next()
1550 send(transform(element1, element2))
Roman Elizarovb555d912017-08-17 21:01:33 +03001551 }
1552 }
1553
Roman Elizarov55a66ac2018-03-12 20:15:07 +03001554