New flow builder: channelFlow (and its alias callbackFlow) and supple⦠(#1214)
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method
Rationale:
* Can be used in different context without breaking context preservation
* Can be used to build concurrent operators such as merge
* Can be used to integrate with callbacks
* Is less error-prone than flowViaChannel because requires explicit await() call
Partially fixes #1210
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index f986b64..dc3180b 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -718,6 +718,8 @@
}
public final class kotlinx/coroutines/channels/ProduceKt {
+ public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -796,6 +798,10 @@
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
+ public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
+ public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
+ public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 9d1df1d..8d34265 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -26,6 +26,36 @@
}
/**
+ * Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
+ * and invokes the given [block] before resuming the coroutine.
+ *
+ * Note that when producer channel is cancelled this function resumes with cancellation exception,
+ * so putting the code after calling this function would not lead to its execution in case of cancellation.
+ * That is why this code takes a lambda parameter.
+ *
+ * Example of usage:
+ * ```
+ * val callbackEventsStream = produce {
+ * val disposable = registerChannelInCallback(channel)
+ * awaitClose { disposable.dispose() }
+ * }
+ * ```
+ */
+@ExperimentalCoroutinesApi
+public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
+ check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
+ try {
+ suspendCancellableCoroutine<Unit> { cont ->
+ invokeOnClose {
+ cont.resume(Unit)
+ }
+ }
+ } finally {
+ block()
+ }
+}
+
+/**
* Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
* object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 06a5c00..733bf63 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -200,38 +200,14 @@
}
/**
- * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
- * that is provided to the builder's [block] of code. It allows elements to be
- * produced by the code that is running in a different context, e.g. from a callback-based API.
- *
- * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
- * on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
- * should be used instead.
- *
- * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
- * The provided channel can later be used by any external service to communicate with flow and its buffer determines
- * backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
- *
- * Example of usage:
- * ```
- * fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
- * val callback = object : Callback { // implementation of some callback interface
- * override fun onNextValue(value: T) {
- * channel.offer(value) // Note: offer drops value when buffer is full
- * }
- * override fun onApiError(cause: Throwable) {
- * channel.cancel("API Error", CancellationException(cause))
- * }
- * override fun onCompleted() = channel.close()
- * }
- * api.register(callback)
- * channel.invokeOnClose {
- * api.unregister(callback)
- * }
- * }
- * ```
+ * @suppress
*/
@FlowPreview
+@Deprecated(
+ message = "Use channelFlow instead",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
+)
public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,90 @@
}
}
}
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
+ * The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
+ * For more detailed example please refer to [callbackFlow] documentation.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Examples of usage:
+ * ```
+ * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
+ * launch {
+ * collect { value -> send(value) }
+ * }
+ * other.collect { value -> send(value) }
+ * }
+ *
+ * fun <T> contextualFlow(): Flow<T> = channelFlow {
+ * launch(Dispatchers.IO) {
+ * send(computeIoValue())
+ * }
+ *
+ * launch(Dispatchers.Default) {
+ * send(computeCpuValue())
+ * }
+ * }
+ * ```
+ */
+@FlowPreview
+public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+ flow {
+ coroutineScope {
+ val channel = produce(capacity = bufferSize, block = block)
+ channel.consumeEach { value ->
+ emit(value)
+ }
+ }
+ }
+
+/**
+ * Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
+ * that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
+ * produced by the code that is running in a different context or running concurrently.
+ *
+ * The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
+ * on the resulting flow.
+ *
+ * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
+ * e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
+ * callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
+ * [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
+ * or when callback-based API invokes [SendChannel.close] manually.
+ *
+ * To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
+ * The provided channel can later be used by any external service to communicate with the flow and its buffer determines
+ * backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
+ *
+ * Example of usage:
+ * ```
+ * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
+ * val callback = object : Callback { // implementation of some callback interface
+ * override fun onNextValue(value: T) {
+ * // Note: offer drops value when buffer is full
+ * // Channel.UNLIMITED can be used to avoid overfill
+ * offer(value)
+ * }
+ * override fun onApiError(cause: Throwable) {
+ * cancel(CancellationException("API Error", cause))
+ * }
+ * override fun onCompleted() = channel.close()
+ * }
+ * api.register(callback)
+ * // Suspend until either onCompleted or external cancellation are invoked
+ * await { api.unregister(callback) }
+ * }
+ * ```
+ */
+public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
+ channelFlow(bufferSize) { block() }
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 5137dd7..cbaf708 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -94,6 +94,60 @@
cancelOnCompletion(coroutineContext)
}
+ @Test
+ fun testAwaitConsumerCancellation() = runTest {
+ val parent = Job()
+ val channel = produce<Int>(parent) {
+ expect(2)
+ awaitClose { expect(4) }
+ }
+ expect(1)
+ yield()
+ expect(3)
+ channel.cancel()
+ parent.complete()
+ parent.join()
+ finish(5)
+ }
+
+ @Test
+ fun testAwaitProducerCancellation() = runTest {
+ val parent = Job()
+ produce<Int>(parent) {
+ expect(2)
+ launch {
+ expect(3)
+ this@produce.cancel()
+ }
+ awaitClose { expect(4) }
+ }
+ expect(1)
+ parent.complete()
+ parent.join()
+ finish(5)
+ }
+
+ @Test
+ fun testAwaitParentCancellation() = runTest {
+ val parent = Job()
+ produce<Int>(parent) {
+ expect(2)
+ awaitClose { expect(4) }
+ }
+ expect(1)
+ yield()
+ expect(3)
+ parent.cancelAndJoin()
+ finish(5)
+ }
+
+ @Test
+ fun testAwaitIllegalState() = runTest {
+ val channel = produce<Int> { }
+ @Suppress("RemoveExplicitTypeArguments") // KT-31525
+ assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose<Nothing>() }
+ }
+
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
val source = Channel<Int>()
expect(1)
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
new file mode 100644
index 0000000..3c74b0f
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class ChannelBuildersFlowTest : TestBase() {
+ @Test
+ fun testBroadcastChannelAsFlow() = runTest {
+ val channel = broadcast {
+ repeat(10) {
+ send(it + 1)
+ }
+ }
+
+ val sum = channel.asFlow().sum()
+ assertEquals(55, sum)
+ }
+
+ @Test
+ fun testExceptionInBroadcast() = runTest {
+ expect(1)
+ val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
+ repeat(10) {
+ send(it + 1)
+ }
+ throw TestException()
+ }
+ assertEquals(15, channel.asFlow().take(5).sum())
+
+ // Workaround for JS bug
+ try {
+ channel.asFlow().collect { /* Do nothing */ }
+ expectUnreached()
+ } catch (e: TestException) {
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testBroadcastChannelAsFlowLimits() = runTest {
+ val channel = BroadcastChannel<Int>(1)
+ val flow = channel.asFlow().map { it * it }.drop(1).take(2)
+
+ var expected = 0
+ launch {
+ assertTrue(channel.offer(1)) // Handed to the coroutine
+ assertTrue(channel.offer(2)) // Buffered
+ assertFalse(channel.offer(3)) // Failed to offer
+ channel.send(3)
+ yield()
+ assertEquals(1, expected)
+ assertTrue(channel.offer(4)) // Handed to the coroutine
+ assertTrue(channel.offer(5)) // Buffered
+ assertFalse(channel.offer(6)) // Failed to offer
+ channel.send(6)
+ assertEquals(2, expected)
+ }
+
+ val sum = flow.sum()
+ assertEquals(13, sum)
+ ++expected
+ val sum2 = flow.sum()
+ assertEquals(61, sum2)
+ ++expected
+ }
+
+ @Test
+ fun flowAsBroadcast() = runTest {
+ val flow = flow {
+ repeat(10) {
+ emit(it)
+ }
+ }
+
+ val channel = flow.broadcastIn(this)
+ assertEquals((0..9).toList(), channel.openSubscription().toList())
+ }
+
+ @Test
+ fun flowAsBroadcastMultipleSubscription() = runTest {
+ val flow = flow {
+ repeat(10) {
+ emit(it)
+ }
+ }
+
+ val broadcast = flow.broadcastIn(this)
+ val channel = broadcast.openSubscription()
+ val channel2 = broadcast.openSubscription()
+
+ assertEquals(0, channel.receive())
+ assertEquals(0, channel2.receive())
+ yield()
+ assertEquals(1, channel.receive())
+ assertEquals(1, channel2.receive())
+
+ channel.cancel()
+ channel2.cancel()
+ yield()
+ ensureActive()
+ }
+
+ @Test
+ fun flowAsBroadcastException() = runTest {
+ val flow = flow {
+ repeat(10) {
+ emit(it)
+ }
+
+ throw TestException()
+ }
+
+ val channel = flow.broadcastIn(this + NonCancellable)
+ assertFailsWith<TestException> { channel.openSubscription().toList() }
+ assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
+ }
+
+ // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
+ @Test
+ fun testFlowAsBroadcastAsFlow() = runTest {
+ val flow = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ }.broadcastIn(this).asFlow()
+
+ assertEquals(6, flow.sum())
+ assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
+ }
+
+ @Test
+ fun testBroadcastAsFlowAsBroadcast() = runTest {
+ val channel = broadcast {
+ send(1)
+ }.asFlow().broadcastIn(this)
+
+ channel.openSubscription().consumeEach {
+ assertEquals(1, it)
+ }
+
+ channel.openSubscription().consumeEach {
+ fail()
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index c2051e7..5d0292e 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -10,141 +10,72 @@
class ChannelFlowTest : TestBase() {
@Test
- fun testBroadcastChannelAsFlow() = runTest {
- val channel = broadcast {
- repeat(10) {
- send(it + 1)
- }
+ fun testRegular() = runTest {
+ val flow = channelFlow {
+ assertTrue(offer(1))
+ assertTrue(offer(2))
+ assertTrue(offer(3))
}
-
- val sum = channel.asFlow().sum()
- assertEquals(55, sum)
+ assertEquals(listOf(1, 2, 3), flow.toList())
}
@Test
- fun testExceptionInBroadcast() = runTest {
+ fun testBuffer() = runTest {
+ val flow = channelFlow(bufferSize = 1) {
+ assertTrue(offer(1))
+ assertTrue(offer(2))
+ assertFalse(offer(3))
+ }
+ assertEquals(listOf(1, 2), flow.toList())
+ }
+
+ @Test
+ fun testConflated() = runTest {
+ val flow = channelFlow(bufferSize = Channel.CONFLATED) {
+ assertTrue(offer(1))
+ assertTrue(offer(2))
+ }
+ assertEquals(listOf(1), flow.toList())
+ }
+
+ @Test
+ fun testFailureCancelsChannel() = runTest {
+ val flow = channelFlow {
+ offer(1)
+ invokeOnClose {
+ expect(2)
+ }
+ }.onEach { throw TestException() }
+
expect(1)
- val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
- repeat(10) {
- send(it + 1)
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testFailureInSourceCancelsConsumer() = runTest {
+ val flow = channelFlow<Int> {
+ expect(2)
+ throw TestException()
+ }.onEach { expectUnreached() }
+
+ expect(1)
+ assertFailsWith<TestException>(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testScopedCancellation() = runTest {
+ val flow = channelFlow<Int> {
+ expect(2)
+ launch(start = CoroutineStart.ATOMIC) {
+ hang { expect(3) }
}
throw TestException()
- }
- assertEquals(15, channel.asFlow().take(5).sum())
+ }.onEach { expectUnreached() }
- // Workaround for JS bug
- try {
- channel.asFlow().collect { /* Do nothing */ }
- expectUnreached()
- } catch (e: TestException) {
- finish(2)
- }
- }
-
- @Test
- fun testBroadcastChannelAsFlowLimits() = runTest {
- val channel = BroadcastChannel<Int>(1)
- val flow = channel.asFlow().map { it * it }.drop(1).take(2)
-
- var expected = 0
- launch {
- assertTrue(channel.offer(1)) // Handed to the coroutine
- assertTrue(channel.offer(2)) // Buffered
- assertFalse(channel.offer(3)) // Failed to offer
- channel.send(3)
- yield()
- assertEquals(1, expected)
- assertTrue(channel.offer(4)) // Handed to the coroutine
- assertTrue(channel.offer(5)) // Buffered
- assertFalse(channel.offer(6)) // Failed to offer
- channel.send(6)
- assertEquals(2, expected)
- }
-
- val sum = flow.sum()
- assertEquals(13, sum)
- ++expected
- val sum2 = flow.sum()
- assertEquals(61, sum2)
- ++expected
- }
-
- @Test
- fun flowAsBroadcast() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val channel = flow.broadcastIn(this)
- assertEquals((0..9).toList(), channel.openSubscription().toList())
- }
-
- @Test
- fun flowAsBroadcastMultipleSubscription() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val broadcast = flow.broadcastIn(this)
- val channel = broadcast.openSubscription()
- val channel2 = broadcast.openSubscription()
-
- assertEquals(0, channel.receive())
- assertEquals(0, channel2.receive())
- yield()
- assertEquals(1, channel.receive())
- assertEquals(1, channel2.receive())
-
- channel.cancel()
- channel2.cancel()
- yield()
- ensureActive()
- }
-
- @Test
- fun flowAsBroadcastException() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
-
- throw TestException()
- }
-
- val channel = flow.broadcastIn(this + NonCancellable)
- assertFailsWith<TestException> { channel.openSubscription().toList() }
- assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
- }
-
- // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
- @Test
- fun testFlowAsBroadcastAsFlow() = runTest {
- val flow = flow {
- emit(1)
- emit(2)
- emit(3)
- }.broadcastIn(this).asFlow()
-
- assertEquals(6, flow.sum())
- assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
- }
-
- @Test
- fun testBroadcastAsFlowAsBroadcast() = runTest {
- val channel = broadcast {
- send(1)
- }.asFlow().broadcastIn(this)
-
- channel.openSubscription().consumeEach {
- assertEquals(1, it)
- }
-
- channel.openSubscription().consumeEach {
- fail()
- }
+ expect(1)
+ assertFailsWith<TestException>(flow)
+ finish(4)
}
}
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
new file mode 100644
index 0000000..d992d06
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/channels/FlowCallbackTest.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class FlowCallbackTest : TestBase() {
+ @Test
+ fun testClosedPrematurely() = runTest(unhandled = listOf({ e -> e is ClosedSendChannelException })) {
+ val outerScope = this
+ val flow = channelFlow {
+ // ~ callback-based API
+ outerScope.launch(Job()) {
+ expect(2)
+ send(1)
+ expectUnreached()
+ }
+ expect(1)
+ }
+ assertEquals(emptyList(), flow.toList())
+ finish(3)
+ }
+
+ @Test
+ fun testNotClosedPrematurely() = runTest {
+ val outerScope = this
+ val flow = channelFlow<Int> {
+ // ~ callback-based API
+ outerScope.launch(Job()) {
+ expect(2)
+ send(1)
+ close()
+ }
+ expect(1)
+ awaitClose()
+ }
+
+ assertEquals(listOf(1), flow.toList())
+ finish(3)
+ }
+}
+
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt
deleted file mode 100644
index 364cd84..0000000
--- a/kotlinx-coroutines-core/common/test/flow/channels/FlowViaChannelTest.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlin.test.*
-
-class FlowViaChannelTest : TestBase() {
- @Test
- fun testRegular() = runTest {
- val flow = flowViaChannel<Int> {
- assertTrue(it.offer(1))
- assertTrue(it.offer(2))
- assertTrue(it.offer(3))
- it.close()
- }
- assertEquals(listOf(1, 2, 3), flow.toList())
- }
-
- @Test
- fun testBuffer() = runTest {
- val flow = flowViaChannel<Int>(bufferSize = 1) {
- assertTrue(it.offer(1))
- assertTrue(it.offer(2))
- assertFalse(it.offer(3))
- it.close()
- }
- assertEquals(listOf(1, 2), flow.toList())
- }
-
- @Test
- fun testConflated() = runTest {
- val flow = flowViaChannel<Int>(bufferSize = Channel.CONFLATED) {
- assertTrue(it.offer(1))
- assertTrue(it.offer(2))
- it.close()
- }
- assertEquals(listOf(1), flow.toList())
- }
-
- @Test
- fun testFailureCancelsChannel() = runTest {
- val flow = flowViaChannel<Int> {
- it.offer(1)
- it.invokeOnClose {
- expect(2)
- }
- }.onEach { throw TestException() }
-
- expect(1)
- assertFailsWith<TestException>(flow)
- finish(3)
- }
-
- @Test
- fun testFailureInSourceCancelsConsumer() = runTest {
- val flow = flowViaChannel<Int> {
- expect(2)
- throw TestException()
- }.onEach { expectUnreached() }
-
- expect(1)
- assertFailsWith<TestException>(flow)
- finish(3)
- }
-
- @Test
- fun testScopedCancellation() = runTest {
- val flow = flowViaChannel<Int> {
- expect(2)
- launch(start = CoroutineStart.ATOMIC) {
- hang { expect(3) }
- }
- throw TestException()
- }.onEach { expectUnreached() }
-
- expect(1)
- assertFailsWith<TestException>(flow)
- finish(4)
- }
-}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
similarity index 80%
rename from kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt
rename to kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index 9d7799c..0a66ae6 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/FlowFromChannelTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -10,7 +10,7 @@
import kotlin.concurrent.*
import kotlin.test.*
-class FlowFromChannelTest : TestBase() {
+class CallbackFlowTest : TestBase() {
private class CallbackApi(val block: (SendChannel<Int>) -> Unit) {
var started = false
@@ -39,9 +39,9 @@
runCatching { it.offer(++i) }
}
- val flow = flowViaChannel<Int> { channel ->
+ val flow = channelFlow<Int>(16) {
api.start(channel)
- channel.invokeOnClose {
+ awaitClose {
api.stop()
}
}
@@ -83,9 +83,9 @@
}
}
- val flow = flowViaChannel<Int> { channel ->
+ val flow = channelFlow<Int> {
api.start(channel)
- channel.invokeOnClose {
+ awaitClose {
api.stop()
}
}
@@ -106,4 +106,22 @@
assertTrue(api.started)
assertTrue(api.stopped)
}
+
+
+ @Test
+ fun testMergeExample() = runTest {
+ // Too slow on JS
+ withContext(Dispatchers.Default) {
+ val f1 = (1..10_000).asFlow()
+ val f2 = (10_001..20_000).asFlow()
+ assertEquals((1..20_000).toSet(), f1.merge(f2).toSet())
+ }
+ }
+
+ private fun Flow<Int>.merge(other: Flow<Int>): Flow<Int> = channelFlow {
+ launch {
+ collect { send(it) }
+ }
+ other.collect { send(it) }
+ }
}