Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
| 5 | @file:JvmMultifileClass |
| 6 | @file:JvmName("FlowKt") |
| 7 | |
| 8 | package kotlinx.coroutines.flow |
| 9 | |
| 10 | import kotlinx.coroutines.* |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 11 | import kotlinx.coroutines.internal.* |
| 12 | import kotlin.coroutines.* |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 13 | import kotlin.jvm.* |
| 14 | import kotlinx.coroutines.flow.unsafeFlow as flow |
| 15 | |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 16 | public typealias ExceptionPredicate = (Throwable) -> Boolean |
| 17 | |
| 18 | private val ALWAYS_TRUE: ExceptionPredicate = { true } |
| 19 | |
| 20 | /** |
| 21 | * Switches to the [fallback] flow if the original flow throws an exception that matches the [predicate]. |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 22 | * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 23 | */ |
| 24 | @FlowPreview |
| 25 | public fun <T> Flow<T>.onErrorCollect( |
| 26 | fallback: Flow<T>, |
| 27 | predicate: ExceptionPredicate = ALWAYS_TRUE |
| 28 | ): Flow<T> = collectSafely { e -> |
| 29 | if (!predicate(e)) throw e |
Vsevolod Tolstopyatov | d5478b6 | 2019-06-06 11:43:31 +0300 | [diff] [blame^] | 30 | emitAll(fallback) |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 31 | } |
| 32 | |
| 33 | /** |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 34 | * Emits the [fallback] value and finishes successfully if the original flow throws exception that matches the given [predicate]. |
| 35 | * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 36 | */ |
| 37 | @FlowPreview |
| 38 | public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: ExceptionPredicate = ALWAYS_TRUE): Flow<T> = |
| 39 | collectSafely { e -> |
| 40 | if (!predicate(e)) throw e |
| 41 | emit(fallback) |
| 42 | } |
| 43 | |
| 44 | /** |
| 45 | * Operator that retries [n][retries] times to collect the given flow in an exception that matches the given [predicate] occurs |
| 46 | * in the given flow. Exceptions from collectors of this flow are not retried. |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 47 | * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator. |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 48 | */ |
| 49 | @FlowPreview |
| 50 | public fun <T> Flow<T>.retry( |
| 51 | retries: Int = Int.MAX_VALUE, |
| 52 | predicate: ExceptionPredicate = ALWAYS_TRUE |
| 53 | ): Flow<T> { |
| 54 | require(retries > 0) { "Expected positive amount of retries, but had $retries" } |
| 55 | return flow { |
| 56 | @Suppress("NAME_SHADOWING") |
| 57 | var retries = retries |
| 58 | // Note that exception may come from the downstream operators, we should not switch on that |
| 59 | while (true) { |
| 60 | var fromDownstream = false |
| 61 | try { |
| 62 | collect { value -> |
| 63 | try { |
| 64 | emit(value) |
| 65 | } catch (e: Throwable) { |
Vsevolod Tolstopyatov | 2a815e8 | 2019-04-08 12:06:39 +0300 | [diff] [blame] | 66 | fromDownstream = true |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 67 | throw e |
| 68 | } |
| 69 | } |
| 70 | break |
| 71 | } catch (e: Throwable) { |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 72 | if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 73 | if (!predicate(e) || retries-- == 0) throw e |
| 74 | } |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 79 | private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean { |
| 80 | val job = coroutineContext[Job] |
| 81 | if (job == null || !job.isCancelled) return false |
| 82 | return unwrap(job.getCancellationException()) == unwrap(this) |
| 83 | } |
| 84 | |
Vsevolod Tolstopyatov | 61c64cc | 2019-04-12 16:05:58 +0300 | [diff] [blame] | 85 | private fun <T> Flow<T>.collectSafely(onException: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T> = |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 86 | flow { |
| 87 | // Note that exception may come from the downstream operators, we should not switch on that |
| 88 | var fromDownstream = false |
| 89 | try { |
| 90 | collect { |
| 91 | try { |
| 92 | emit(it) |
| 93 | } catch (e: Throwable) { |
| 94 | fromDownstream = true |
| 95 | throw e |
| 96 | } |
| 97 | } |
| 98 | } catch (e: Throwable) { |
Vsevolod Tolstopyatov | 997d2f2 | 2019-05-24 12:41:40 +0300 | [diff] [blame] | 99 | if (fromDownstream || e.isCancellationCause(coroutineContext)) throw e |
Vsevolod Tolstopyatov | d57bfa2 | 2019-04-04 14:25:13 +0300 | [diff] [blame] | 100 | onException(e) |
| 101 | } |
| 102 | } |