blob: 55f1e361152ef59841647b7cd1362b9bbd098ea3 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.rx2
18
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030019import io.reactivex.*
20import io.reactivex.functions.*
21import kotlinx.atomicfu.*
Roman Elizarovc0d559b2017-09-28 14:27:05 +030022import kotlinx.coroutines.experimental.*
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030023import kotlinx.coroutines.experimental.channels.*
24import kotlinx.coroutines.experimental.selects.*
25import kotlinx.coroutines.experimental.sync.*
26import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030027
28/**
29 * Creates cold [observable][Observable] that will run a given [block] in a coroutine.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030030 * Every time the returned observable is subscribed, it starts a new coroutine.
Roman Elizarov331750b2017-02-15 17:59:17 +030031 * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
32 *
Roman Elizarov50e32212017-03-10 17:40:50 +030033 * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
34 * Note, that Rx 2.x [Observable] **does not support backpressure**. Use [rxFlowable].
Roman Elizarov331750b2017-02-15 17:59:17 +030035 *
36 * | **Coroutine action** | **Signal to subscriber**
37 * | -------------------------------------------- | ------------------------
38 * | `send` | `onNext`
39 * | Normal completion or `close` without cause | `onComplete`
40 * | Failure with exception or `close` with cause | `onError`
Roman Elizarovc0d559b2017-09-28 14:27:05 +030041 *
42 * The [context] for the new coroutine can be explicitly specified.
43 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarovc7d10a42018-03-13 18:28:42 +030044 * The [coroutineContext] of the parent coroutine may be used,
Roman Elizarovc0d559b2017-09-28 14:27:05 +030045 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
Roman Elizarove8f694e2017-11-28 10:12:00 +030046 * The parent job may be also explicitly specified using [parent] parameter.
47 *
Roman Elizarovc0d559b2017-09-28 14:27:05 +030048 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
49 *
50 * @param context context of the coroutine. The default value is [DefaultDispatcher].
Roman Elizarove8f694e2017-11-28 10:12:00 +030051 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
Roman Elizarovc0d559b2017-09-28 14:27:05 +030052 * @param block the coroutine code.
Roman Elizarov331750b2017-02-15 17:59:17 +030053 */
Roman Elizarove8f694e2017-11-28 10:12:00 +030054public fun <T> rxObservable(
55 context: CoroutineContext = DefaultDispatcher,
56 parent: Job? = null,
57 block: suspend ProducerScope<T>.() -> Unit
58): Observable<T> = Observable.create { subscriber ->
59 val newContext = newCoroutineContext(context, parent)
60 val coroutine = RxObservableCoroutine(newContext, subscriber)
Roman Elizarove8f694e2017-11-28 10:12:00 +030061 subscriber.setCancellable(coroutine) // do it first (before starting coroutine), to await unnecessary suspensions
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030062 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Roman Elizarove8f694e2017-11-28 10:12:00 +030063}
64
65/** @suppress **Deprecated**: Binary compatibility */
66@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
Roman Elizarovc0d559b2017-09-28 14:27:05 +030067@JvmOverloads // for binary compatibility with older code compiled before context had a default
Roman Elizarov331750b2017-02-15 17:59:17 +030068public fun <T> rxObservable(
Roman Elizarovc0d559b2017-09-28 14:27:05 +030069 context: CoroutineContext = DefaultDispatcher,
Roman Elizarov331750b2017-02-15 17:59:17 +030070 block: suspend ProducerScope<T>.() -> Unit
Roman Elizarove8f694e2017-11-28 10:12:00 +030071): Observable<T> =
72 rxObservable(context, block = block)
Roman Elizarov331750b2017-02-15 17:59:17 +030073
Roman Elizarov7753f8e2017-08-15 11:16:33 +030074private const val OPEN = 0 // open channel, still working
75private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
76private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
77
Roman Elizarov331750b2017-02-15 17:59:17 +030078private class RxObservableCoroutine<T>(
Roman Elizarov2b12d582017-06-22 20:12:19 +030079 parentContext: CoroutineContext,
Roman Elizarov331750b2017-02-15 17:59:17 +030080 private val subscriber: ObservableEmitter<T>
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030081) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Cancellable, SelectClause2<T, SendChannel<T>> {
Roman Elizarov331750b2017-02-15 17:59:17 +030082 override val channel: SendChannel<T> get() = this
83
84 // Mutex is locked when while subscriber.onXXX is being invoked
85 private val mutex = Mutex()
86
Roman Elizarov7753f8e2017-08-15 11:16:33 +030087 private val _signal = atomic(OPEN)
Roman Elizarov331750b2017-02-15 17:59:17 +030088
89 override val isClosedForSend: Boolean get() = isCompleted
90 override val isFull: Boolean = mutex.isLocked
91 override fun close(cause: Throwable?): Boolean = cancel(cause)
92
Roman Elizarov331750b2017-02-15 17:59:17 +030093 override fun offer(element: T): Boolean {
94 if (!mutex.tryLock()) return false
95 doLockedNext(element)
96 return true
97 }
98
Roman Elizarov6640b2b2018-01-17 19:08:55 +030099 public override suspend fun send(element: T) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300100 // fast-path -- try send without suspension
101 if (offer(element)) return
102 // slow-path does suspend
103 return sendSuspend(element)
104 }
105
106 private suspend fun sendSuspend(element: T) {
107 mutex.lock()
108 doLockedNext(element)
109 }
110
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300111 override val onSend: SelectClause2<T, SendChannel<T>>
112 get() = this
113
114 // registerSelectSend
115 @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
116 override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
117 mutex.onLock.registerSelectClause2(select, null) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300118 doLockedNext(element)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300119 block(this)
Roman Elizarov331750b2017-02-15 17:59:17 +0300120 }
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300121 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300122
123 // assert: mutex.isLocked()
124 private fun doLockedNext(elem: T) {
125 // check if already closed for send
Roman Elizarov29affbb2017-07-21 13:47:41 +0300126 if (!isActive) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300127 doLockedSignalCompleted()
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300128 throw getCancellationException()
Roman Elizarov331750b2017-02-15 17:59:17 +0300129 }
130 // notify subscriber
131 try {
132 subscriber.onNext(elem)
133 } catch (e: Throwable) {
134 try {
135 if (!cancel(e))
Roman Elizarov9fe5f462018-02-21 19:05:52 +0300136 handleCoroutineException(context, e)
Roman Elizarov331750b2017-02-15 17:59:17 +0300137 } finally {
138 doLockedSignalCompleted()
139 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300140 throw getCancellationException()
Roman Elizarov331750b2017-02-15 17:59:17 +0300141 }
142 /*
Roman Elizarov29affbb2017-07-21 13:47:41 +0300143 There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
144 happen after this check and before `unlock` (see `onCancellation` that does not do anything
Roman Elizarov331750b2017-02-15 17:59:17 +0300145 if it fails to acquire the lock that we are still holding).
146 We have to recheck `isCompleted` after `unlock` anyway.
147 */
148 mutex.unlock()
Roman Elizarov29affbb2017-07-21 13:47:41 +0300149 // recheck isActive
150 if (!isActive && mutex.tryLock())
Roman Elizarov331750b2017-02-15 17:59:17 +0300151 doLockedSignalCompleted()
152 }
153
154 // assert: mutex.isLocked()
155 private fun doLockedSignalCompleted() {
156 try {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300157 if (_signal.value >= CLOSED) {
158 _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
Roman Elizarov3ef4fca2017-07-12 12:42:20 +0300159 val cause = getCompletionCause()
Roman Elizarov331750b2017-02-15 17:59:17 +0300160 try {
Roman Elizarove89cd682018-04-25 13:03:40 +0300161 if (cause != null && cause !is CancellationException)
Roman Elizarov3ef4fca2017-07-12 12:42:20 +0300162 subscriber.onError(cause)
Roman Elizarov331750b2017-02-15 17:59:17 +0300163 else
164 subscriber.onComplete()
165 } catch (e: Throwable) {
Roman Elizarov9fe5f462018-02-21 19:05:52 +0300166 handleCoroutineException(context, e)
Roman Elizarov331750b2017-02-15 17:59:17 +0300167 }
168 }
169 } finally {
170 mutex.unlock()
171 }
172 }
173
Roman Elizarov6640b2b2018-01-17 19:08:55 +0300174 override fun onCancellation(cause: Throwable?) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300175 if (!_signal.compareAndSet(OPEN, CLOSED)) return // abort, other thread invoked doLockedSignalCompleted
Roman Elizarov331750b2017-02-15 17:59:17 +0300176 if (mutex.tryLock()) // if we can acquire the lock
177 doLockedSignalCompleted()
178 }
179
180 // Cancellable impl
181 override fun cancel() { cancel(cause = null) }
182}