blob: 29777b7a83d77eecaa2bde12a7f054d2ed5dd4d9 [file] [log] [blame]
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5@file:JvmMultifileClass
6@file:JvmName("FlowKt")
7
8package kotlinx.coroutines.flow
9
10import kotlinx.coroutines.*
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030011import kotlinx.coroutines.internal.*
12import kotlin.coroutines.*
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030013import kotlin.jvm.*
14import kotlinx.coroutines.flow.unsafeFlow as flow
15
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030016public typealias ExceptionPredicate = (Throwable) -> Boolean
17
18private val ALWAYS_TRUE: ExceptionPredicate = { true }
19
20/**
21 * Switches to the [fallback] flow if the original flow throws an exception that matches the [predicate].
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030022 * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030023 */
24@FlowPreview
25public fun <T> Flow<T>.onErrorCollect(
26 fallback: Flow<T>,
27 predicate: ExceptionPredicate = ALWAYS_TRUE
28): Flow<T> = collectSafely { e ->
29 if (!predicate(e)) throw e
Vsevolod Tolstopyatovd5478b62019-06-06 11:43:31 +030030 emitAll(fallback)
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030031}
32
33/**
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030034 * Emits the [fallback] value and finishes successfully if the original flow throws exception that matches the given [predicate].
35 * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030036 */
37@FlowPreview
38public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: ExceptionPredicate = ALWAYS_TRUE): Flow<T> =
39 collectSafely { e ->
40 if (!predicate(e)) throw e
41 emit(fallback)
42 }
43
44/**
45 * Operator that retries [n][retries] times to collect the given flow in an exception that matches the given [predicate] occurs
46 * in the given flow. Exceptions from collectors of this flow are not retried.
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030047 * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030048 */
49@FlowPreview
50public fun <T> Flow<T>.retry(
51 retries: Int = Int.MAX_VALUE,
52 predicate: ExceptionPredicate = ALWAYS_TRUE
53): Flow<T> {
54 require(retries > 0) { "Expected positive amount of retries, but had $retries" }
55 return flow {
56 @Suppress("NAME_SHADOWING")
57 var retries = retries
58 // Note that exception may come from the downstream operators, we should not switch on that
59 while (true) {
60 var fromDownstream = false
61 try {
62 collect { value ->
63 try {
64 emit(value)
65 } catch (e: Throwable) {
Vsevolod Tolstopyatov2a815e82019-04-08 12:06:39 +030066 fromDownstream = true
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030067 throw e
68 }
69 }
70 break
71 } catch (e: Throwable) {
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030072 if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030073 if (!predicate(e) || retries-- == 0) throw e
74 }
75 }
76 }
77}
78
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030079private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
80 val job = coroutineContext[Job]
81 if (job == null || !job.isCancelled) return false
82 return unwrap(job.getCancellationException()) == unwrap(this)
83}
84
Vsevolod Tolstopyatov61c64cc2019-04-12 16:05:58 +030085private fun <T> Flow<T>.collectSafely(onException: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T> =
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +030086 flow {
87 // Note that exception may come from the downstream operators, we should not switch on that
88 var fromDownstream = false
89 try {
90 collect {
91 try {
92 emit(it)
93 } catch (e: Throwable) {
94 fromDownstream = true
95 throw e
96 }
97 }
98 } catch (e: Throwable) {
Vsevolod Tolstopyatov997d2f22019-05-24 12:41:40 +030099 if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e
Vsevolod Tolstopyatovd57bfa22019-04-04 14:25:13 +0300100 onException(e)
101 }
102 }