blob: 692df29685e64a6831e585e57c67ad4a0a245e98 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
Roman Elizarov2adf8bc2018-01-24 20:09:57 +03007import kotlinx.atomicfu.*
Roman Elizarovc0d559b2017-09-28 14:27:05 +03008import kotlinx.coroutines.experimental.*
Roman Elizarov2adf8bc2018-01-24 20:09:57 +03009import kotlinx.coroutines.experimental.channels.*
10import kotlinx.coroutines.experimental.selects.*
11import kotlinx.coroutines.experimental.sync.*
12import org.reactivestreams.*
13import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030014
15/**
16 * Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030017 * Every time the returned publisher is subscribed, it starts a new coroutine.
Roman Elizarov331750b2017-02-15 17:59:17 +030018 * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
19 *
20 * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
21 * `onNext` is not invoked concurrently.
22 *
23 * | **Coroutine action** | **Signal to subscriber**
24 * | -------------------------------------------- | ------------------------
25 * | `send` | `onNext`
26 * | Normal completion or `close` without cause | `onComplete`
27 * | Failure with exception or `close` with cause | `onError`
Roman Elizarovc0d559b2017-09-28 14:27:05 +030028 *
29 * The [context] for the new coroutine can be explicitly specified.
30 * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
Roman Elizarovc7d10a42018-03-13 18:28:42 +030031 * The [coroutineContext] of the parent coroutine may be used,
Roman Elizarovc0d559b2017-09-28 14:27:05 +030032 * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
Roman Elizarove8f694e2017-11-28 10:12:00 +030033 * The parent job may be also explicitly specified using [parent] parameter.
34
Roman Elizarovc0d559b2017-09-28 14:27:05 +030035 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
36 *
37 * @param context context of the coroutine. The default value is [DefaultDispatcher].
Roman Elizarove8f694e2017-11-28 10:12:00 +030038 * @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
Roman Elizarovc0d559b2017-09-28 14:27:05 +030039 * @param block the coroutine code.
Roman Elizarov331750b2017-02-15 17:59:17 +030040 */
Roman Elizarove8f694e2017-11-28 10:12:00 +030041public fun <T> publish(
42 context: CoroutineContext = DefaultDispatcher,
43 parent: Job? = null,
44 block: suspend ProducerScope<T>.() -> Unit
45): Publisher<T> = Publisher { subscriber ->
46 val newContext = newCoroutineContext(context, parent)
47 val coroutine = PublisherCoroutine(newContext, subscriber)
Roman Elizarove8f694e2017-11-28 10:12:00 +030048 subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030049 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Roman Elizarove8f694e2017-11-28 10:12:00 +030050}
51
52/** @suppress **Deprecated**: Binary compatibility */
53@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
Roman Elizarovc0d559b2017-09-28 14:27:05 +030054@JvmOverloads // for binary compatibility with older code compiled before context had a default
Roman Elizarov331750b2017-02-15 17:59:17 +030055public fun <T> publish(
Roman Elizarovc0d559b2017-09-28 14:27:05 +030056 context: CoroutineContext = DefaultDispatcher,
Roman Elizarov331750b2017-02-15 17:59:17 +030057 block: suspend ProducerScope<T>.() -> Unit
Roman Elizarove8f694e2017-11-28 10:12:00 +030058): Publisher<T> =
59 publish(context, block = block)
Roman Elizarov331750b2017-02-15 17:59:17 +030060
Roman Elizarov7753f8e2017-08-15 11:16:33 +030061private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
62private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
63
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030064private class PublisherCoroutine<in T>(
Roman Elizarov2b12d582017-06-22 20:12:19 +030065 parentContext: CoroutineContext,
Roman Elizarov331750b2017-02-15 17:59:17 +030066 private val subscriber: Subscriber<T>
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030067) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
Roman Elizarov331750b2017-02-15 17:59:17 +030068 override val channel: SendChannel<T> get() = this
69
70 // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
71 private val mutex = Mutex(locked = true)
72
Roman Elizarov7753f8e2017-08-15 11:16:33 +030073 private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
Roman Elizarov331750b2017-02-15 17:59:17 +030074
75 override val isClosedForSend: Boolean get() = isCompleted
76 override val isFull: Boolean = mutex.isLocked
77 override fun close(cause: Throwable?): Boolean = cancel(cause)
78
Roman Elizarov331750b2017-02-15 17:59:17 +030079 override fun offer(element: T): Boolean {
80 if (!mutex.tryLock()) return false
81 doLockedNext(element)
82 return true
83 }
84
Roman Elizarov6640b2b2018-01-17 19:08:55 +030085 public override suspend fun send(element: T) {
Roman Elizarov331750b2017-02-15 17:59:17 +030086 // fast-path -- try send without suspension
87 if (offer(element)) return
88 // slow-path does suspend
89 return sendSuspend(element)
90 }
91
92 private suspend fun sendSuspend(element: T) {
93 mutex.lock()
94 doLockedNext(element)
95 }
96
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030097 override val onSend: SelectClause2<T, SendChannel<T>>
98 get() = this
99
100 // registerSelectSend
101 @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
102 override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
103 mutex.onLock.registerSelectClause2(select, null) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300104 doLockedNext(element)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300105 block(this)
Roman Elizarov331750b2017-02-15 17:59:17 +0300106 }
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300107 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300108
109 // assert: mutex.isLocked()
110 private fun doLockedNext(elem: T) {
111 // check if already closed for send
Roman Elizarov29affbb2017-07-21 13:47:41 +0300112 if (!isActive) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300113 doLockedSignalCompleted()
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300114 throw getCancellationException()
Roman Elizarov331750b2017-02-15 17:59:17 +0300115 }
116 // notify subscriber
117 try {
118 subscriber.onNext(elem)
119 } catch (e: Throwable) {
120 try {
121 if (!cancel(e))
Roman Elizarov9fe5f462018-02-21 19:05:52 +0300122 handleCoroutineException(context, e)
Roman Elizarov331750b2017-02-15 17:59:17 +0300123 } finally {
124 doLockedSignalCompleted()
125 }
Roman Elizarov8b38fa22017-09-27 17:44:31 +0300126 throw getCancellationException()
Roman Elizarov331750b2017-02-15 17:59:17 +0300127 }
128 // now update nRequested
129 while (true) { // lock-free loop on nRequested
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300130 val cur = _nRequested.value
Roman Elizarov331750b2017-02-15 17:59:17 +0300131 if (cur < 0) break // closed from inside onNext => unlock
132 if (cur == Long.MAX_VALUE) break // no back-pressure => unlock
133 val upd = cur - 1
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300134 if (_nRequested.compareAndSet(cur, upd)) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300135 if (upd == 0L) return // return to keep locked due to back-pressure
136 break // unlock if upd > 0
137 }
138 }
139 /*
Roman Elizarov29affbb2017-07-21 13:47:41 +0300140 There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
141 happen after this check and before `unlock` (see `onCancellation` that does not do anything
Roman Elizarov331750b2017-02-15 17:59:17 +0300142 if it fails to acquire the lock that we are still holding).
Roman Elizarov29affbb2017-07-21 13:47:41 +0300143 We have to recheck `isActive` after `unlock` anyway.
Roman Elizarov331750b2017-02-15 17:59:17 +0300144 */
145 mutex.unlock()
Roman Elizarov29affbb2017-07-21 13:47:41 +0300146 // recheck isActive
147 if (!isActive && mutex.tryLock())
Roman Elizarov331750b2017-02-15 17:59:17 +0300148 doLockedSignalCompleted()
149 }
150
151 // assert: mutex.isLocked()
152 private fun doLockedSignalCompleted() {
153 try {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300154 if (_nRequested.value >= CLOSED) {
155 _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
Roman Elizarov3ef4fca2017-07-12 12:42:20 +0300156 val cause = getCompletionCause()
Roman Elizarov331750b2017-02-15 17:59:17 +0300157 try {
Roman Elizarove89cd682018-04-25 13:03:40 +0300158 if (cause != null && cause !is CancellationException)
Roman Elizarov3ef4fca2017-07-12 12:42:20 +0300159 subscriber.onError(cause)
Roman Elizarov331750b2017-02-15 17:59:17 +0300160 else
161 subscriber.onComplete()
162 } catch (e: Throwable) {
Roman Elizarov9fe5f462018-02-21 19:05:52 +0300163 handleCoroutineException(context, e)
Roman Elizarov331750b2017-02-15 17:59:17 +0300164 }
165 }
166 } finally {
167 mutex.unlock()
168 }
169 }
170
171 override fun request(n: Long) {
172 if (n < 0) {
173 cancel(IllegalArgumentException("Must request non-negative number, but $n requested"))
174 return
175 }
176 while (true) { // lock-free loop for nRequested
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300177 val cur = _nRequested.value
Roman Elizarov331750b2017-02-15 17:59:17 +0300178 if (cur < 0) return // already closed for send, ignore requests
179 var upd = cur + n
180 if (upd < 0 || n == Long.MAX_VALUE)
181 upd = Long.MAX_VALUE
182 if (cur == upd) return // nothing to do
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300183 if (_nRequested.compareAndSet(cur, upd)) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300184 // unlock the mutex when we don't have back-pressure anymore
Roman Elizarovd14a3902017-03-13 12:03:35 +0300185 if (cur == 0L) {
186 mutex.unlock()
Roman Elizarov29affbb2017-07-21 13:47:41 +0300187 // recheck isActive
188 if (!isActive && mutex.tryLock())
Roman Elizarovd14a3902017-03-13 12:03:35 +0300189 doLockedSignalCompleted()
190 }
Roman Elizarov331750b2017-02-15 17:59:17 +0300191 return
192 }
193 }
194 }
195
Roman Elizarov6640b2b2018-01-17 19:08:55 +0300196 override fun onCancellation(cause: Throwable?) {
Roman Elizarov331750b2017-02-15 17:59:17 +0300197 while (true) { // lock-free loop for nRequested
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300198 val cur = _nRequested.value
Roman Elizarov29affbb2017-07-21 13:47:41 +0300199 if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
Roman Elizarov3ef4fca2017-07-12 12:42:20 +0300200 check(cur >= 0) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300201 if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS
Roman Elizarov331750b2017-02-15 17:59:17 +0300202 // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
203 if (cur == 0L) {
204 doLockedSignalCompleted()
205 } else {
206 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
207 if (mutex.tryLock())
208 doLockedSignalCompleted()
209 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
210 }
211 return // done anyway
212 }
213 }
214
215 // Subscription impl
216 override fun cancel() {
217 cancel(cause = null)
218 }
219}