blob: 584178d8c4be908a0fd6cfa6bf72c41f81423e75 [file] [log] [blame]
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5
6package kotlinx.coroutines.flow.internal
7
8import kotlinx.coroutines.*
9import kotlinx.coroutines.channels.*
10import kotlinx.coroutines.flow.*
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030011import kotlinx.coroutines.internal.*
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030012import kotlinx.coroutines.selects.*
13
14internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
15
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030016internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030017 first: Flow<T1>, second: Flow<T2>,
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030018 transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030019) {
20 coroutineScope {
21 val firstChannel = asFairChannel(first)
22 val secondChannel = asFairChannel(second)
23 var firstValue: Any? = null
24 var secondValue: Any? = null
25 var firstIsClosed = false
26 var secondIsClosed = false
27 while (!firstIsClosed || !secondIsClosed) {
28 select<Unit> {
29 onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
30 firstValue = value
31 if (secondValue !== null) {
32 transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
33 }
34 }
35
36 onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
37 secondValue = value
38 if (firstValue !== null) {
39 transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
40 }
41 }
42 }
43 }
44 }
45}
46
47@PublishedApi
Vsevolod Tolstopyatovd5338482019-08-06 16:08:54 +030048internal suspend fun <R, T> FlowCollector<R>.combineInternal(
49 flows: Array<out Flow<T>>,
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030050 arrayFactory: () -> Array<T?>,
51 transform: suspend FlowCollector<R>.(Array<T>) -> Unit
Vsevolod Tolstopyatov340d5012019-10-21 17:29:11 +030052): Unit = coroutineScope {
53 val size = flows.size
54 val channels = Array(size) { asFairChannel(flows[it]) }
55 val latestValues = arrayOfNulls<Any?>(size)
56 val isClosed = Array(size) { false }
57 var nonClosed = size
58 var remainingNulls = size
59 // See flow.combine(other) for explanation.
60 while (nonClosed != 0) {
61 select<Unit> {
62 for (i in 0 until size) {
63 onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
64 if (latestValues[i] == null) --remainingNulls
65 latestValues[i] = value
66 if (remainingNulls != 0) return@onReceive
67 val arguments = arrayFactory()
68 for (index in 0 until size) {
69 arguments[index] = NULL.unbox(latestValues[index])
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030070 }
Vsevolod Tolstopyatov340d5012019-10-21 17:29:11 +030071 transform(arguments as Array<T>)
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030072 }
73 }
74 }
75 }
76}
77
78private inline fun SelectBuilder<Unit>.onReceive(
79 isClosed: Boolean,
80 channel: ReceiveChannel<Any>,
81 crossinline onClosed: () -> Unit,
82 noinline onReceive: suspend (value: Any) -> Unit
83) {
84 if (isClosed) return
Vsevolod Tolstopyatov340d5012019-10-21 17:29:11 +030085 @Suppress("DEPRECATION")
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +030086 channel.onReceiveOrNull {
87 // TODO onReceiveOrClosed when boxing issues are fixed
88 if (it === null) onClosed()
89 else onReceive(it)
90 }
91}
92
93// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
94private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
95 val channel = channel as ChannelCoroutine<Any>
96 flow.collect { value ->
97 return@collect channel.sendFair(value ?: NULL)
98 }
99}
100
101internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
102 coroutineScope {
103 val first = asChannel(flow)
104 val second = asChannel(flow2)
105 /*
106 * This approach only works with rendezvous channel and is required to enforce correctness
107 * in the following scenario:
108 * ```
109 * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
110 * val f2 = flowOf(1)
111 * f1.zip(f2) { ... }
112 * ```
113 *
114 * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
115 */
116 (second as SendChannel<*>).invokeOnClose {
Vsevolod Tolstopyatovecbfa6d2019-10-21 20:25:45 +0300117 if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +0300118 }
119
120 val otherIterator = second.iterator()
121 try {
122 first.consumeEach { value ->
123 if (!otherIterator.hasNext()) {
124 return@consumeEach
125 }
126 emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
127 }
128 } catch (e: AbortFlowException) {
Vsevolod Tolstopyatovecbfa6d2019-10-21 20:25:45 +0300129 e.checkOwnership(owner = this@unsafeFlow)
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +0300130 } finally {
Vsevolod Tolstopyatovecbfa6d2019-10-21 20:25:45 +0300131 if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
Vsevolod Tolstopyatov44c4c562019-07-25 16:17:12 +0300132 }
133 }
134}
135
136// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
137private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
138 flow.collect { value ->
139 return@collect channel.send(value ?: NULL)
140 }
141}