blob: 692df29685e64a6831e585e57c67ad4a0a245e98 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.reactive
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*
import kotlinx.coroutines.experimental.sync.*
import org.reactivestreams.*
import kotlin.coroutines.experimental.*
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
* Every time the returned publisher is subscribed, it starts a new coroutine.
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* | **Coroutine action** | **Signal to subscriber**
* | -------------------------------------------- | ------------------------
* | `send` | `onNext`
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
* The [context] for the new coroutine can be explicitly specified.
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [coroutineContext] of the parent coroutine may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param block the coroutine code.
*/
public fun <T> publish(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
val newContext = newCoroutineContext(context, parent)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
@JvmOverloads // for binary compatibility with older code compiled before context had a default
public fun <T> publish(
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> =
publish(context, block = block)
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
private class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
private val mutex = Mutex(locked = true)
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
override val isClosedForSend: Boolean get() = isCompleted
override val isFull: Boolean = mutex.isLocked
override fun close(cause: Throwable?): Boolean = cancel(cause)
override fun offer(element: T): Boolean {
if (!mutex.tryLock()) return false
doLockedNext(element)
return true
}
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
// slow-path does suspend
return sendSuspend(element)
}
private suspend fun sendSuspend(element: T) {
mutex.lock()
doLockedNext(element)
}
override val onSend: SelectClause2<T, SendChannel<T>>
get() = this
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
doLockedNext(element)
block(this)
}
}
// assert: mutex.isLocked()
private fun doLockedNext(elem: T) {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted()
throw getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
try {
if (!cancel(e))
handleCoroutineException(context, e)
} finally {
doLockedSignalCompleted()
}
throw getCancellationException()
}
// now update nRequested
while (true) { // lock-free loop on nRequested
val cur = _nRequested.value
if (cur < 0) break // closed from inside onNext => unlock
if (cur == Long.MAX_VALUE) break // no back-pressure => unlock
val upd = cur - 1
if (_nRequested.compareAndSet(cur, upd)) {
if (upd == 0L) return // return to keep locked due to back-pressure
break // unlock if upd > 0
}
}
/*
There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
happen after this check and before `unlock` (see `onCancellation` that does not do anything
if it fails to acquire the lock that we are still holding).
We have to recheck `isActive` after `unlock` anyway.
*/
mutex.unlock()
// recheck isActive
if (!isActive && mutex.tryLock())
doLockedSignalCompleted()
}
// assert: mutex.isLocked()
private fun doLockedSignalCompleted() {
try {
if (_nRequested.value >= CLOSED) {
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
val cause = getCompletionCause()
try {
if (cause != null && cause !is CancellationException)
subscriber.onError(cause)
else
subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
}
} finally {
mutex.unlock()
}
}
override fun request(n: Long) {
if (n < 0) {
cancel(IllegalArgumentException("Must request non-negative number, but $n requested"))
return
}
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur < 0) return // already closed for send, ignore requests
var upd = cur + n
if (upd < 0 || n == Long.MAX_VALUE)
upd = Long.MAX_VALUE
if (cur == upd) return // nothing to do
if (_nRequested.compareAndSet(cur, upd)) {
// unlock the mutex when we don't have back-pressure anymore
if (cur == 0L) {
mutex.unlock()
// recheck isActive
if (!isActive && mutex.tryLock())
doLockedSignalCompleted()
}
return
}
}
}
override fun onCancellation(cause: Throwable?) {
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
check(cur >= 0) // no other thread could have marked it as CLOSED, because onCancellation is invoked once
if (!_nRequested.compareAndSet(cur, CLOSED)) continue // retry on failed CAS
// Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
if (cur == 0L) {
doLockedSignalCompleted()
} else {
// otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
if (mutex.tryLock())
doLockedSignalCompleted()
// Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
}
return // done anyway
}
}
// Subscription impl
override fun cancel() {
cancel(cause = null)
}
}