blob: b92ced6ab78c452959d387558f6ce0427e3108f4 [file] [log] [blame]
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.native.concurrent.*
/**
* Abstract send channel. It is a base class for all send channel implementations.
*/
internal abstract class AbstractSendChannel<E>(
@JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E> {
/** @suppress **This is unstable API and it is subject to change.** */
protected val queue = LockFreeLinkedListHead()
// ------ extension points for buffered channels ------
/**
* Returns `true` if [isBufferFull] is always `true`.
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferAlwaysFull: Boolean
/**
* Returns `true` if this channel's buffer is full.
* This operation should be atomic if it is invoked by [enqueueSend].
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferFull: Boolean
// State transitions: null -> handler -> HANDLER_INVOKED
private val onCloseHandler = atomic<Any?>(null)
// ------ internal functions for override by buffered channels ------
/**
* Tries to add element to buffer or to queued receiver.
* Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun offerInternal(element: E): Any {
while (true) {
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
val token = receive.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
receive.completeResumeReceive(element)
return receive.offerResult
}
}
}
/**
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
// offer atomically with select
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
if (failure != null) return failure
val receive = offerOp.result
receive.completeResumeReceive(element)
return receive.offerResult
}
// ------ state functions & helpers for concrete implementations ------
/**
* Returns non-null closed token if it is last in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
/**
* Returns non-null closed token if it is first in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
/**
* Retrieves first sending waiter from the queue or returns closed token.
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun takeFirstSendOrPeekClosed(): Send? =
queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
/**
* Queues buffered element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
queue.addLastIfPrev(SendBuffered(element)) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
true
}
return null
}
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
private open class SendBufferedDesc<E>(
queue: LockFreeLinkedListHead,
element: E
) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
is Closed<*> -> affected
is ReceiveOrClosed<*> -> OFFER_FAILED
else -> null
}
}
// ------ SendChannel ------
public final override val isClosedForSend: Boolean get() = closedForSend != null
private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
if (offerInternal(element) === OFFER_SUCCESS) return
// slow-path does suspend or throws exception
return sendSuspend(element)
}
@Suppress("DEPRECATION", "DEPRECATION_ERROR")
override fun offer(element: E): Boolean {
// Temporary migration for offer users who rely on onUndeliveredElement
try {
return super.offer(element)
} catch (e: Throwable) {
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
// If it crashes, add send exception as suppressed for better diagnostics
it.addSuppressed(e)
throw it
}
throw e
}
}
public final override fun trySend(element: E): ChannelResult<Unit> {
val result = offerInternal(element)
return when {
result === OFFER_SUCCESS -> ChannelResult.success(Unit)
result === OFFER_FAILED -> {
// We should check for closed token on trySend as well, otherwise trySend won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
val closedForSend = closedForSend ?: return ChannelResult.failure()
ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
}
result is Closed<*> -> {
ChannelResult.closed(helpCloseAndGetSendException(result))
}
else -> error("trySend returned $result")
}
}
private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
helpClose(closed)
return closed.sendException
}
private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
helpClose(closed)
// Element was not delivered -> cals onUndeliveredElement
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
// If it crashes, add send exception as suppressed for better diagnostics
it.addSuppressed(closed.sendException)
throw it
}
return closed.sendException
}
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
val send = if (onUndeliveredElement == null)
SendElement(element, cont) else
SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
val enqueueResult = enqueueSend(send)
when {
enqueueResult == null -> { // enqueued successfully
cont.removeOnCancellation(send)
return@sc
}
enqueueResult is Closed<*> -> {
cont.helpCloseAndResumeWithSendException(element, enqueueResult)
return@sc
}
enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
enqueueResult is Receive<*> -> {} // try to offer instead
else -> error("enqueueSend returned $enqueueResult")
}
}
// hm... receiver is waiting or buffer is not full. try to offer
val offerResult = offerInternal(element)
when {
offerResult === OFFER_SUCCESS -> {
cont.resume(Unit)
return@sc
}
offerResult === OFFER_FAILED -> continue@loop
offerResult is Closed<*> -> {
cont.helpCloseAndResumeWithSendException(element, offerResult)
return@sc
}
else -> error("offerInternal returned $offerResult")
}
}
}
private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
helpClose(closed)
val sendException = closed.sendException
onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
it.addSuppressed(sendException)
resumeWithException(it)
return
}
resumeWithException(sendException)
}
/**
* Result is:
* * null -- successfully enqueued
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
*/
protected open fun enqueueSend(send: Send): Any? {
if (isBufferAlwaysFull) {
queue.addLastIfPrev(send) { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
true
}
} else {
if (!queue.addLastIfPrevAndIf(send, { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
true
}, { isBufferFull }))
return ENQUEUE_FAILED
}
return null
}
public override fun close(cause: Throwable?): Boolean {
val closed = Closed<E>(cause)
/*
* Try to commit close by adding a close token to the end of the queue.
* Successful -> we're now responsible for closing receivers
* Not successful -> help closing pending receivers to maintain invariant
* "if (!close()) next send will throw"
*/
val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
helpClose(actuallyClosed)
if (closeAdded) invokeOnCloseHandler(cause)
return closeAdded // true if we have closed
}
private fun invokeOnCloseHandler(cause: Throwable?) {
val handler = onCloseHandler.value
if (handler !== null && handler !== HANDLER_INVOKED
&& onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
// CAS failed -> concurrent invokeOnClose() invoked handler
@Suppress("UNCHECKED_CAST")
(handler as Handler)(cause)
}
}
override fun invokeOnClose(handler: Handler) {
// Intricate dance for concurrent invokeOnClose and close calls
if (!onCloseHandler.compareAndSet(null, handler)) {
val value = onCloseHandler.value
if (value === HANDLER_INVOKED) {
throw IllegalStateException("Another handler was already registered and successfully invoked")
}
throw IllegalStateException("Another handler was already registered: $value")
} else {
val closedToken = closedForSend
if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
// CAS failed -> close() call invoked handler
(handler)(closedToken.closeCause)
}
}
}
private fun helpClose(closed: Closed<*>) {
/*
* It's important to traverse list from right to left to avoid races with sender.
* Consider channel state: head -> [receive_1] -> [receive_2] -> head
* - T1 calls receive()
* - T2 calls close()
* - T3 calls close() + send(value)
*
* If both will traverse list from left to right, following non-linearizable history is possible:
* [close -> false], [send -> transferred 'value' to receiver]
*
* Another problem with linearizability of close is that we cannot resume closed receives until all
* receivers are removed from the list.
* Consider channel state: head -> [receive_1] -> [receive_2] -> head
* - T1 called receive_2, and will call send() when it's receive call resumes
* - T2 calls close()
*
* Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
* its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
*/
var closedList = InlineList<Receive<E>>()
while (true) {
// Break when channel is empty or has no receivers
@Suppress("UNCHECKED_CAST")
val previous = closed.prevNode as? Receive<E> ?: break
if (!previous.remove()) {
// failed to remove the node (due to race) -- retry finding non-removed prevNode
// NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
previous.helpRemove() // make sure remove is complete before continuing
continue
}
// add removed nodes to a separate list
closedList += previous
}
/*
* Now notify all removed nodes that the channel was closed
* in the order they were added to the channel
*/
closedList.forEachReversed { it.resumeReceiveClosed(closed) }
// and do other post-processing
onClosedIdempotent(closed)
}
/**
* Invoked when channel is closed as the last action of [close] invocation.
* This method should be idempotent and can be called multiple times.
*/
protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
/**
* Retrieves first receiving waiter from the queue or returns closed token.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
// ------ registerSelectSend ------
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected class TryOfferDesc<E>(
@JvmField val element: E,
queue: LockFreeLinkedListHead
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
is Closed<*> -> affected
!is ReceiveOrClosed<*> -> OFFER_FAILED
else -> null
}
@Suppress("UNCHECKED_CAST")
override fun onPrepare(prepareOp: PrepareOp): Any? {
val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
assert { token === RESUME_TOKEN }
return null
}
}
final override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
}
}
private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
while (true) {
if (select.isSelected) return
if (isFullImpl) {
val node = SendSelect(element, this, select, block)
val enqueueResult = enqueueSend(node)
when {
enqueueResult == null -> { // enqueued successfully
select.disposeOnSelect(node)
return
}
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
enqueueResult is Receive<*> -> {} // try to offer
else -> error("enqueueSend returned $enqueueResult ")
}
}
// hm... receiver is waiting or buffer is not full. try to offer
val offerResult = offerSelectInternal(element, select)
when {
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === RETRY_ATOMIC -> {} // retry
offerResult === OFFER_SUCCESS -> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
else -> error("offerSelectInternal returned $offerResult")
}
}
}
// ------ debug ------
public override fun toString() =
"$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
private val queueDebugStateString: String
get() {
val head = queue.nextNode
if (head === queue) return "EmptyQueue"
var result = when (head) {
is Closed<*> -> head.toString()
is Receive<*> -> "ReceiveQueued"
is Send -> "SendQueued"
else -> "UNEXPECTED:$head" // should not happen
}
val tail = queue.prevNode
if (tail !== head) {
result += ",queueSize=${countQueueSize()}"
if (tail is Closed<*>) result += ",closedForSend=$tail"
}
return result
}
private fun countQueueSize(): Int {
var size = 0
queue.forEach<LockFreeLinkedListNode> { size++ }
return size
}
protected open val bufferDebugString: String get() = ""
// ------ private ------
private class SendSelect<E, R>(
override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
@JvmField val channel: AbstractSendChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
) : Send(), DisposableHandle {
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol? // must return symbol
override fun completeResumeSend() {
block.startCoroutineCancellable(receiver = channel, completion = select.completion)
}
override fun dispose() { // invoked on select completion
if (!remove()) return
// if the node was successfully removed (meaning it was added but was not received) then element not delivered
undeliveredElement()
}
override fun resumeSendClosed(closed: Closed<*>) {
if (select.trySelect())
select.resumeSelectWithException(closed.sendException)
}
override fun undeliveredElement() {
channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
}
override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
}
internal class SendBuffered<out E>(
@JvmField val element: E
) : Send() {
override val pollResult: Any? get() = element
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
/**
* This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList].
*/
override fun resumeSendClosed(closed: Closed<*>) {
assert { false }
}
override fun toString(): String = "SendBuffered@$hexAddress($element)"
}
}
/**
* Abstract send/receive channel. It is a base class for all channel implementations.
*/
internal abstract class AbstractChannel<E>(
onUndeliveredElement: OnUndeliveredElement<E>?
) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
// ------ extension points for buffered channels ------
/**
* Returns `true` if [isBufferEmpty] is always `true`.
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferAlwaysEmpty: Boolean
/**
* Returns `true` if this channel's buffer is empty.
* This operation should be atomic if it is invoked by [enqueueReceive].
* @suppress **This is unstable API and it is subject to change.**
*/
protected abstract val isBufferEmpty: Boolean
// ------ internal functions for override by buffered channels ------
/**
* Tries to remove element from buffer or from queued sender.
* Return type is `E | POLL_FAILED | Closed`
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun pollInternal(): Any? {
while (true) {
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
val token = send.tryResumeSend(null)
if (token != null) {
assert { token === RESUME_TOKEN }
send.completeResumeSend()
return send.pollResult
}
// too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
send.undeliveredElement()
}
}
/**
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
// poll atomically with select
val pollOp = describeTryPoll()
val failure = select.performAtomicTrySelect(pollOp)
if (failure != null) return failure
val send = pollOp.result
send.completeResumeSend()
return pollOp.result.pollResult
}
// ------ state functions & helpers for concrete implementations ------
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
// ------ ReceiveChannel ------
public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
public override val isEmpty: Boolean get() = isEmptyImpl
protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
public final override suspend fun receive(): E {
// fast path -- try poll non-blocking
val result = pollInternal()
/*
* If result is Closed -- go to tail-call slow-path that will allow us to
* properly recover stacktrace without paying a performance cost on fast path.
* We prefer to recover stacktrace using suspending path to have a more precise stacktrace.
*/
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
}
@Suppress("UNCHECKED_CAST")
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = if (onUndeliveredElement == null)
ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
return@sc
}
// hm... something is not right. try to poll
val result = pollInternal()
if (result is Closed<*>) {
receive.resumeReceiveClosed(result)
return@sc
}
if (result !== POLL_FAILED) {
cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
return@sc
}
}
}
protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
if (result) onReceiveEnqueued()
}
@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveCatching(): ChannelResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
// slow-path does suspend
return receiveSuspend(RECEIVE_RESULT)
}
@Suppress("UNCHECKED_CAST")
public final override fun tryReceive(): ChannelResult<E> {
val result = pollInternal()
if (result === POLL_FAILED) return ChannelResult.failure()
if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
return ChannelResult.success(result as E)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
final override fun cancel(cause: Throwable?): Boolean =
cancelInternal(cause)
final override fun cancel(cause: CancellationException?) {
/*
* Do not create an exception if channel is already cancelled.
* Channel is closed for receive when either it is cancelled (then we are free to bail out)
* or was closed and elements were received.
* Then `onCancelIdempotent` does nothing for all implementations.
*/
if (isClosedForReceive) return
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}
// It needs to be internal to support deprecated cancel(Throwable?) API
internal fun cancelInternal(cause: Throwable?): Boolean =
close(cause).also {
onCancelIdempotent(it)
}
/**
* Method that is invoked right after [close] in [cancel] sequence.
* [wasClosed] is directly mapped to the value returned by [close].
*/
protected open fun onCancelIdempotent(wasClosed: Boolean) {
/*
* See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
* has the same rationale.
*/
val closed = closedForSend ?: error("Cannot happen")
var list = InlineList<Send>()
while (true) {
val previous = closed.prevNode
if (previous is LockFreeLinkedListHead) {
break
}
assert { previous is Send }
if (!previous.remove()) {
previous.helpRemove() // make sure remove is complete before continuing
continue
}
// Add to the list only **after** successful removal
list += previous as Send
}
onCancelIdempotentList(list, closed)
}
/**
* This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list.
*/
protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
list.forEachReversed { it.resumeSendClosed(closed) }
}
public final override fun iterator(): ChannelIterator<E> = Itr(this)
// ------ registerSelectReceive ------
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
/**
* @suppress **This is unstable API and it is subject to change.**
*/
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
is Closed<*> -> affected
!is Send -> POLL_FAILED
else -> null
}
@Suppress("UNCHECKED_CAST")
override fun onPrepare(prepareOp: PrepareOp): Any? {
val affected = prepareOp.affected as Send // see "failure" impl
val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
if (token === RETRY_ATOMIC) return RETRY_ATOMIC
assert { token === RESUME_TOKEN }
return null
}
override fun onRemoved(affected: LockFreeLinkedListNode) {
// Called when we removed it from the queue but were too late to resume, so we have undelivered element
(affected as Send).undeliveredElement()
}
}
final override val onReceive: SelectClause1<E>
get() = object : SelectClause1<E> {
@Suppress("UNCHECKED_CAST")
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
}
}
final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
get() = object : SelectClause1<ChannelResult<E>> {
@Suppress("UNCHECKED_CAST")
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
}
}
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
while (true) {
if (select.isSelected) return
if (isEmptyImpl) {
if (enqueueReceiveSelect(select, block, receiveMode)) return
} else {
val pollResult = pollSelectInternal(select)
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult === RETRY_ATOMIC -> {} // retry
else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
}
}
}
}
private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {
when (value) {
is Closed<*> -> {
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> {
throw recoverStackTrace(value.receiveException)
}
RECEIVE_RESULT -> {
if (!select.trySelect()) return
startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
}
}
}
else -> {
if (receiveMode == RECEIVE_RESULT) {
startCoroutineUnintercepted(value.toResult<Any>(), select.completion)
} else {
startCoroutineUnintercepted(value, select.completion)
}
}
}
}
private fun <R> enqueueReceiveSelect(
select: SelectInstance<R>,
block: suspend (Any?) -> R,
receiveMode: Int
): Boolean {
val node = ReceiveSelect(this, select, block, receiveMode)
val result = enqueueReceive(node)
if (result) select.disposeOnSelect(node)
return result
}
// ------ protected ------
override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
super.takeFirstReceiveOrPeekClosed().also {
if (it != null && it !is Closed<*>) onReceiveDequeued()
}
/**
* Invoked when receiver is successfully enqueued to the queue of waiting receivers.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun onReceiveEnqueued() {}
/**
* Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
* @suppress **This is unstable API and it is subject to change.**
*/
protected open fun onReceiveDequeued() {}
// ------ private ------
private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
override fun invoke(cause: Throwable?) {
if (receive.remove())
onReceiveDequeued()
}
override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
}
private class Itr<E>(@JvmField val channel: AbstractChannel<E>) : ChannelIterator<E> {
var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
override suspend fun hasNext(): Boolean {
// check for repeated hasNext
if (result !== POLL_FAILED) return hasNextResult(result)
// fast path -- try poll non-blocking
result = channel.pollInternal()
if (result !== POLL_FAILED) return hasNextResult(result)
// slow-path does suspend
return hasNextSuspend()
}
private fun hasNextResult(result: Any?): Boolean {
if (result is Closed<*>) {
if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
return false
}
return true
}
private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
channel.removeReceiveOnCancel(cont, receive)
return@sc
}
// hm... something is not right. try to poll
val result = channel.pollInternal()
this.result = result
if (result is Closed<*>) {
if (result.closeCause == null)
cont.resume(false)
else
cont.resumeWithException(result.receiveException)
return@sc
}
if (result !== POLL_FAILED) {
@Suppress("UNCHECKED_CAST")
cont.resume(true, channel.onUndeliveredElement?.bindCancellationFun(result as E, cont.context))
return@sc
}
}
}
@Suppress("UNCHECKED_CAST")
override fun next(): E {
val result = this.result
if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
if (result !== POLL_FAILED) {
this.result = POLL_FAILED
return result as E
}
throw IllegalStateException("'hasNext' should be called prior to 'next' invocation")
}
}
private open class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<Any?>,
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
RECEIVE_RESULT -> ChannelResult.success(value)
else -> value
}
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
return RESUME_TOKEN
}
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
override fun resumeReceiveClosed(closed: Closed<*>) {
when {
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
else -> cont.resumeWithException(closed.receiveException)
}
}
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
}
private class ReceiveElementWithUndeliveredHandler<in E>(
cont: CancellableContinuation<Any?>,
receiveMode: Int,
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>
) : ReceiveElement<E>(cont, receiveMode) {
override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
onUndeliveredElement.bindCancellationFun(value, cont.context)
}
private open class ReceiveHasNext<E>(
@JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
) : Receive<E>() {
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare()
return RESUME_TOKEN
}
override fun completeResumeReceive(value: E) {
/*
When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
but completeResumeReceive is called once so we set iterator result here.
*/
iterator.result = value
cont.completeResume(RESUME_TOKEN)
}
override fun resumeReceiveClosed(closed: Closed<*>) {
val token = if (closed.closeCause == null) {
cont.tryResume(false)
} else {
cont.tryResumeWithException(closed.receiveException)
}
if (token != null) {
iterator.result = closed
cont.completeResume(token)
}
}
override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
override fun toString(): String = "ReceiveHasNext@$hexAddress"
}
private class ReceiveSelect<R, E>(
@JvmField val channel: AbstractChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
select.trySelectOther(otherOp) as Symbol?
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
}
override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
}
}
override fun dispose() { // invoked on select completion
if (remove())
channel.onReceiveDequeued() // notify cancellation of receive
}
override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
}
}
// receiveMode values
internal const val RECEIVE_THROWS_ON_CLOSE = 0
internal const val RECEIVE_RESULT = 1
@JvmField
@SharedImmutable
internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
@JvmField
@SharedImmutable
internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
@JvmField
@SharedImmutable
internal val OFFER_FAILED = Symbol("OFFER_FAILED")
@JvmField
@SharedImmutable
internal val POLL_FAILED = Symbol("POLL_FAILED")
@JvmField
@SharedImmutable
internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
@JvmField
@SharedImmutable
internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
internal typealias Handler = (Throwable?) -> Unit
/**
* Represents sending waiter in the queue.
*/
internal abstract class Send : LockFreeLinkedListNode() {
abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
// Returns: null - failure,
// RETRY_ATOMIC for retry (only when otherOp != null),
// RESUME_TOKEN on success (call completeResumeSend)
// Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
abstract fun completeResumeSend()
abstract fun resumeSendClosed(closed: Closed<*>)
open fun undeliveredElement() {}
}
/**
* Represents receiver waiter in the queue or closed token.
*/
internal interface ReceiveOrClosed<in E> {
val offerResult: Any // OFFER_SUCCESS | Closed
// Returns: null - failure,
// RETRY_ATOMIC for retry (only when otherOp != null),
// RESUME_TOKEN on success (call completeResumeReceive)
// Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
fun completeResumeReceive(value: E)
}
/**
* Represents sender for a specific element.
*/
internal open class SendElement<E>(
override val pollResult: E,
@JvmField val cont: CancellableContinuation<Unit>
) : Send() {
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
otherOp?.finishPrepare() // finish preparations
return RESUME_TOKEN
}
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
}
internal class SendElementWithUndeliveredHandler<E>(
pollResult: E,
cont: CancellableContinuation<Unit>,
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>
) : SendElement<E>(pollResult, cont) {
override fun remove(): Boolean {
if (!super.remove()) return false
// if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
undeliveredElement()
return true
}
override fun undeliveredElement() {
onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
}
}
/**
* Represents closed channel.
*/
internal class Closed<in E>(
@JvmField val closeCause: Throwable?
) : Send(), ReceiveOrClosed<E> {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
override val offerResult get() = this
override val pollResult get() = this
override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(value: E) {}
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
}
internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
}
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ChannelResult<E> =
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)