| /* |
| * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| */ |
| |
| package kotlinx.coroutines.experimental.reactive |
| |
| import kotlinx.atomicfu.* |
| import kotlinx.coroutines.experimental.* |
| import kotlinx.coroutines.experimental.channels.* |
| import kotlinx.coroutines.experimental.selects.* |
| import kotlinx.coroutines.experimental.sync.* |
| import org.reactivestreams.* |
| import kotlin.coroutines.experimental.* |
| |
| /** |
| * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. |
| * Every time the returned publisher is subscribed, it starts a new coroutine. |
| * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. |
| * |
| * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that |
| * `onNext` is not invoked concurrently. |
| * |
| * | **Coroutine action** | **Signal to subscriber** |
| * | -------------------------------------------- | ------------------------ |
| * | `send` | `onNext` |
| * | Normal completion or `close` without cause | `onComplete` |
| * | Failure with exception or `close` with cause | `onError` |
| * |
| * The [context] for the new coroutine can be explicitly specified. |
| * See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`. |
| * The [coroutineContext] of the parent coroutine may be used, |
| * in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine. |
| * The parent job may be also explicitly specified using [parent] parameter. |
| |
| * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used. |
| * |
| * @param context context of the coroutine. The default value is [DefaultDispatcher]. |
| * @param parent explicitly specifies the parent job, overrides job from the [context] (if any). |
| * @param block the coroutine code. |
| */ |
| public fun <T> publish( |
| context: CoroutineContext = DefaultDispatcher, |
| parent: Job? = null, |
| block: suspend ProducerScope<T>.() -> Unit |
| ): Publisher<T> = Publisher { subscriber -> |
| val newContext = newCoroutineContext(context, parent) |
| val coroutine = PublisherCoroutine(newContext, subscriber) |
| subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions |
| coroutine.start(CoroutineStart.DEFAULT, coroutine, block) |
| } |
| |
| /** @suppress **Deprecated**: Binary compatibility */ |
| @Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) |
| @JvmOverloads // for binary compatibility with older code compiled before context had a default |
| public fun <T> publish( |
| context: CoroutineContext = DefaultDispatcher, |
| block: suspend ProducerScope<T>.() -> Unit |
| ): Publisher<T> = |
| publish(context, block = block) |
| |
| private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet |
| private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError |
| |
| private class PublisherCoroutine<in T>( |
| parentContext: CoroutineContext, |
| private val subscriber: Subscriber<T> |
| ) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> { |
| override val channel: SendChannel<T> get() = this |
| |
| // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked |
| private val mutex = Mutex(locked = true) |
| |
| private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) |
| |
| override val isClosedForSend: Boolean get() = isCompleted |
| override val isFull: Boolean = mutex.isLocked |
| override fun close(cause: Throwable?): Boolean = cancel(cause) |
| override fun invokeOnClose(handler: (Throwable?) -> Unit) = |
| throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") |
| |
| override fun offer(element: T): Boolean { |
| if (!mutex.tryLock()) return false |
| doLockedNext(element) |
| return true |
| } |
| |
| public override suspend fun send(element: T) { |
| // fast-path -- try send without suspension |
| if (offer(element)) return |
| // slow-path does suspend |
| return sendSuspend(element) |
| } |
| |
| private suspend fun sendSuspend(element: T) { |
| mutex.lock() |
| doLockedNext(element) |
| } |
| |
| override val onSend: SelectClause2<T, SendChannel<T>> |
| get() = this |
| |
| // registerSelectSend |
| @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") |
| override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { |
| mutex.onLock.registerSelectClause2(select, null) { |
| doLockedNext(element) |
| block(this) |
| } |
| } |
| |
| // assert: mutex.isLocked() |
| private fun doLockedNext(elem: T) { |
| // check if already closed for send |
| if (!isActive) { |
| doLockedSignalCompleted() |
| throw getCancellationException() |
| } |
| // notify subscriber |
| try { |
| subscriber.onNext(elem) |
| } catch (e: Throwable) { |
| try { |
| if (!cancel(e)) |
| handleCoroutineException(context, e) |
| } finally { |
| doLockedSignalCompleted() |
| } |
| throw getCancellationException() |
| } |
| // now update nRequested |
| while (true) { // lock-free loop on nRequested |
| val cur = _nRequested.value |
| if (cur < 0) break // closed from inside onNext => unlock |
| if (cur == Long.MAX_VALUE) break // no back-pressure => unlock |
| val upd = cur - 1 |
| if (_nRequested.compareAndSet(cur, upd)) { |
| if (upd == 0L) return // return to keep locked due to back-pressure |
| break // unlock if upd > 0 |
| } |
| } |
| /* |
| There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might |
| happen after this check and before `unlock` (see `onCancellation` that does not do anything |
| if it fails to acquire the lock that we are still holding). |
| We have to recheck `isActive` after `unlock` anyway. |
| */ |
| mutex.unlock() |
| // recheck isActive |
| if (!isActive && mutex.tryLock()) |
| doLockedSignalCompleted() |
| } |
| |
| // assert: mutex.isLocked() |
| private fun doLockedSignalCompleted() { |
| try { |
| if (_nRequested.value >= CLOSED) { |
| _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) |
| val cause = getCompletionCause() |
| try { |
| if (cause != null && cause !is CancellationException) |
| subscriber.onError(cause) |
| else |
| subscriber.onComplete() |
| } catch (e: Throwable) { |
| handleCoroutineException(context, e) |
| } |
| } |
| } finally { |
| mutex.unlock() |
| } |
| } |
| |
| override fun request(n: Long) { |
| if (n < 0) { |
| cancel(IllegalArgumentException("Must request non-negative number, but $n requested")) |
| return |
| } |
| while (true) { // lock-free loop for nRequested |
| val cur = _nRequested.value |
| if (cur < 0) return // already closed for send, ignore requests |
| var upd = cur + n |
| if (upd < 0 || n == Long.MAX_VALUE) |
| upd = Long.MAX_VALUE |
| if (cur == upd) return // nothing to do |
| if (_nRequested.compareAndSet(cur, upd)) { |
| // unlock the mutex when we don't have back-pressure anymore |
| if (cur == 0L) { |
| mutex.unlock() |
| // recheck isActive |
| if (!isActive && mutex.tryLock()) |
| doLockedSignalCompleted() |
| } |
| return |
| } |
| } |
| } |
| |
| override fun onCancellation(cause: Throwable?) { |
| while (true) { // lock-free loop for nRequested |
| val cur = _nRequested.value |
| if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion |
| check(cur >= 0) // no other thread could have marked it as CLOSED, because onCancellation is invoked once |
| if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS |
| // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure |
| if (cur == 0L) { |
| doLockedSignalCompleted() |
| } else { |
| // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion |
| if (mutex.tryLock()) |
| doLockedSignalCompleted() |
| // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock` |
| } |
| return // done anyway |
| } |
| } |
| |
| // Subscription impl |
| override fun cancel() { |
| cancel(cause = null) |
| } |
| } |