blob: 3a519e651426328a4682f5b555cd5696403a796e [file] [log] [blame]
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5@file:JvmMultifileClass
6@file:JvmName("FlowKt")
7
8package kotlinx.coroutines.flow
9
10import kotlinx.coroutines.*
11import kotlinx.coroutines.flow.internal.*
12import kotlinx.coroutines.flow.unsafeFlow as flow
13import kotlin.jvm.*
14
15/**
16 * Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
17 * Throws [UnsupportedOperationException] if flow was empty.
18 */
19@FlowPreview
20public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
Roman Elizarov9b059082019-05-24 01:44:03 +030021 var accumulator: Any? = NULL
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030022
23 collect { value ->
Roman Elizarov9b059082019-05-24 01:44:03 +030024 accumulator = if (accumulator !== NULL) {
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030025 @Suppress("UNCHECKED_CAST")
26 operation(accumulator as S, value)
27 } else {
28 value
29 }
30 }
31
Roman Elizarov9b059082019-05-24 01:44:03 +030032 if (accumulator === NULL) throw UnsupportedOperationException("Empty flow can't be reduced")
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030033 @Suppress("UNCHECKED_CAST")
34 return accumulator as S
35}
36
37/**
38 * Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
39 */
40@FlowPreview
Vsevolod Tolstopyatova9f8c0d2019-04-30 15:05:18 +030041public suspend inline fun <T, R> Flow<T>.fold(
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030042 initial: R,
Vsevolod Tolstopyatova9f8c0d2019-04-30 15:05:18 +030043 crossinline operation: suspend (acc: R, value: T) -> R
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030044): R {
45 var accumulator = initial
46 collect { value ->
47 accumulator = operation(accumulator, value)
48 }
49 return accumulator
50}
51
52/**
53 * Terminal operator, that awaits for one and only one value to be published.
54 * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
55 * that contains more than one element.
56 */
57@FlowPreview
58public suspend fun <T> Flow<T>.single(): T {
Roman Elizarov9b059082019-05-24 01:44:03 +030059 var result: Any? = NULL
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030060 collect { value ->
Roman Elizarov9b059082019-05-24 01:44:03 +030061 if (result !== NULL) error("Expected only one element")
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030062 result = value
63 }
64
Roman Elizarov9b059082019-05-24 01:44:03 +030065 if (result === NULL) throw NoSuchElementException("Expected at least one element")
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030066 @Suppress("UNCHECKED_CAST")
67 return result as T
68}
69
70/**
71 * Terminal operator, that awaits for one and only one value to be published.
72 * Throws [IllegalStateException] for flow that contains more than one element.
73 */
74@FlowPreview
75public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
76 var result: T? = null
77 collect { value ->
78 if (result != null) error("Expected only one element")
79 result = value
80 }
81
82 return result
83}