Vsevolod Tolstopyatov | 6d1a6e3 | 2020-02-18 15:28:00 +0300 | [diff] [blame] | 1 | /* |
Aurimas Liutikas | c8879d6 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 2 | * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Vsevolod Tolstopyatov | 6d1a6e3 | 2020-02-18 15:28:00 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame] | 5 | package kotlinx.coroutines.channels |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 6 | |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 7 | import kotlinx.coroutines.* |
Vsevolod Tolstopyatov | 69c15b3 | 2018-11-09 18:14:00 +0300 | [diff] [blame] | 8 | import kotlinx.coroutines.internal.* |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 9 | import kotlinx.coroutines.selects.* |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 10 | |
| 11 | /** |
| 12 | * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, |
| 13 | * so that the receiver always gets the most recently sent element. |
Roman Elizarov | 6862afc | 2020-03-04 16:17:31 +0300 | [diff] [blame] | 14 | * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 15 | * while previously sent elements **are lost**. |
| 16 | * Sender to this channel never suspends and [offer] always returns `true`. |
| 17 | * |
Roman Elizarov | f2a710a | 2017-07-21 18:33:59 +0300 | [diff] [blame] | 18 | * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation. |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 19 | */ |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 20 | internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) { |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 21 | protected final override val isBufferAlwaysEmpty: Boolean get() = false |
| 22 | protected final override val isBufferEmpty: Boolean get() = value === EMPTY |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 23 | protected final override val isBufferAlwaysFull: Boolean get() = false |
| 24 | protected final override val isBufferFull: Boolean get() = false |
| 25 | |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 26 | override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl } |
| 27 | |
| 28 | private val lock = ReentrantLock() |
| 29 | |
| 30 | private var value: Any? = EMPTY |
| 31 | |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 32 | // result is `OFFER_SUCCESS | Closed` |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 33 | protected override fun offerInternal(element: E): Any { |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 34 | var receive: ReceiveOrClosed<E>? = null |
| 35 | lock.withLock { |
| 36 | closedForSend?.let { return it } |
| 37 | // if there is no element written in buffer |
| 38 | if (value === EMPTY) { |
| 39 | // check for receivers that were waiting on the empty buffer |
| 40 | loop@ while(true) { |
| 41 | receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued |
| 42 | if (receive is Closed) { |
| 43 | return receive!! |
Roman Elizarov | e6e8ce8 | 2017-06-05 17:04:39 +0300 | [diff] [blame] | 44 | } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 45 | val token = receive!!.tryResumeReceive(element, null) |
| 46 | if (token != null) { |
| 47 | assert { token === RESUME_TOKEN } |
| 48 | return@withLock |
| 49 | } |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 50 | } |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 51 | } |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 52 | updateValueLocked(element)?.let { throw it } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 53 | return OFFER_SUCCESS |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 54 | } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 55 | // breaks here if offer meets receiver |
| 56 | receive!!.completeResumeReceive(element) |
| 57 | return receive!!.offerResult |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 58 | } |
| 59 | |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 60 | // result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed` |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 61 | protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 62 | var receive: ReceiveOrClosed<E>? = null |
| 63 | lock.withLock { |
| 64 | closedForSend?.let { return it } |
| 65 | if (value === EMPTY) { |
| 66 | loop@ while(true) { |
| 67 | val offerOp = describeTryOffer(element) |
| 68 | val failure = select.performAtomicTrySelect(offerOp) |
| 69 | when { |
| 70 | failure == null -> { // offered successfully |
| 71 | receive = offerOp.result |
| 72 | return@withLock |
| 73 | } |
| 74 | failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer |
| 75 | failure === RETRY_ATOMIC -> {} // retry |
| 76 | failure === ALREADY_SELECTED || failure is Closed<*> -> return failure |
| 77 | else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") |
| 78 | } |
| 79 | } |
| 80 | } |
| 81 | // try to select sending this element to buffer |
| 82 | if (!select.trySelect()) { |
| 83 | return ALREADY_SELECTED |
| 84 | } |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 85 | updateValueLocked(element)?.let { throw it } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 86 | return OFFER_SUCCESS |
| 87 | } |
| 88 | // breaks here if offer meets receiver |
| 89 | receive!!.completeResumeReceive(element) |
| 90 | return receive!!.offerResult |
| 91 | } |
| 92 | |
| 93 | // result is `E | POLL_FAILED | Closed` |
| 94 | protected override fun pollInternal(): Any? { |
| 95 | var result: Any? = null |
| 96 | lock.withLock { |
| 97 | if (value === EMPTY) return closedForSend ?: POLL_FAILED |
| 98 | result = value |
| 99 | value = EMPTY |
| 100 | } |
| 101 | return result |
| 102 | } |
| 103 | |
| 104 | // result is `E | POLL_FAILED | Closed` |
| 105 | protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { |
| 106 | var result: Any? = null |
| 107 | lock.withLock { |
| 108 | if (value === EMPTY) return closedForSend ?: POLL_FAILED |
| 109 | if (!select.trySelect()) |
| 110 | return ALREADY_SELECTED |
| 111 | result = value |
| 112 | value = EMPTY |
| 113 | } |
| 114 | return result |
| 115 | } |
| 116 | |
| 117 | protected override fun onCancelIdempotent(wasClosed: Boolean) { |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 118 | var undeliveredElementException: UndeliveredElementException? = null // resource cancel exception |
| 119 | lock.withLock { |
| 120 | undeliveredElementException = updateValueLocked(EMPTY) |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 121 | } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 122 | super.onCancelIdempotent(wasClosed) |
Aurimas Liutikas | c8879d6 | 2021-05-12 21:56:16 +0000 | [diff] [blame^] | 123 | undeliveredElementException?.let { throw it } // throw exception at the end if there was one |
Roman Elizarov | 8773a26 | 2020-10-12 19:09:48 +0300 | [diff] [blame] | 124 | } |
| 125 | |
| 126 | private fun updateValueLocked(element: Any?): UndeliveredElementException? { |
| 127 | val old = value |
| 128 | val undeliveredElementException = if (old === EMPTY) null else |
| 129 | onUndeliveredElement?.callUndeliveredElementCatchingException(old as E) |
| 130 | value = element |
| 131 | return undeliveredElementException |
Roman Elizarov | 3aed4ee | 2017-03-06 12:21:05 +0300 | [diff] [blame] | 132 | } |
SokolovaMaria | 0126dba | 2019-11-26 01:04:07 +0300 | [diff] [blame] | 133 | |
| 134 | override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock { |
| 135 | super.enqueueReceiveInternal(receive) |
| 136 | } |
| 137 | |
| 138 | // ------ debug ------ |
| 139 | |
| 140 | override val bufferDebugString: String |
| 141 | get() = "(value=$value)" |
Vsevolod Tolstopyatov | 6d1a6e3 | 2020-02-18 15:28:00 +0300 | [diff] [blame] | 142 | } |