blob: f82011fb47be761bb90244d44e320ba9a298d4e5 [file] [log] [blame]
Francesco Vasco66d18c02017-08-04 18:10:09 +02001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Francesco Vasco66d18c02017-08-04 18:10:09 +02003 */
4
5package kotlinx.coroutines.experimental.channels8
6
7import kotlinx.coroutines.experimental.CommonPool
Francesco Vasco66d18c02017-08-04 18:10:09 +02008import kotlinx.coroutines.experimental.channels.ReceiveChannel
9import kotlinx.coroutines.experimental.channels.consumeEach
10import kotlinx.coroutines.experimental.channels.produce
11import kotlinx.coroutines.experimental.runBlocking
12import java.util.*
13import java.util.function.BiConsumer
14import java.util.function.Consumer
15import java.util.stream.Collector
16import java.util.stream.Stream
17import java.util.stream.StreamSupport
18import kotlin.coroutines.experimental.CoroutineContext
19
20/**
21 * Creates a [ProducerJob] to read all element of the [Stream].
22 */
Roman Elizarovb555d912017-08-17 21:01:33 +030023public fun <E> Stream<E>.asReceiveChannel(context: CoroutineContext = CommonPool): ReceiveChannel<E> = produce(context) {
Francesco Vasco66d18c02017-08-04 18:10:09 +020024 for (element in this@asReceiveChannel)
25 send(element)
26}
27
28/**
29 * Creates a [Stream] of elements in this [ReceiveChannel].
30 */
31public fun <E : Any> ReceiveChannel<E>.asStream(): Stream<E> = StreamSupport.stream<E>(SpliteratorAdapter(this), false)
32
33/**
34 * Applies the [collector] to the [ReceiveChannel]
35 */
36public suspend fun <T, A : Any, R> ReceiveChannel<T>.collect(collector: Collector<T, A, R>): R {
37 val container: A = collector.supplier().get()
38 val accumulator: BiConsumer<A, T> = collector.accumulator()
39 consumeEach { accumulator.accept(container, it) }
40 return collector.finisher().apply(container)
41}
42
43private class SpliteratorAdapter<E : Any>(val channel: ReceiveChannel<E>) : Spliterator<E> {
44 override fun estimateSize(): Long = Long.MAX_VALUE
45
46 override fun forEachRemaining(action: Consumer<in E>) {
47 runBlocking {
48 for (element in channel)
49 action.accept(element)
50 }
51 }
52
53 override fun tryAdvance(action: Consumer<in E>): Boolean = runBlocking {
54 val element = channel.receiveOrNull()
55 if (element != null) {
56 action.accept(element)
57 true
58 } else false
59 }
60
61 override fun characteristics(): Int = characteristics
62
63 override fun trySplit(): Spliterator<E>? = null
64
65 private companion object {
66 @JvmStatic
67 private val characteristics = Spliterator.ORDERED or Spliterator.NONNULL
68 }
69}