blob: bb8ce693b739fd4af6dffe209a68985aeb443e6c [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030017package kotlinx.coroutines.experimental.channels
18
Roman Elizarove4b6f092018-03-06 13:37:42 +030019import kotlinx.coroutines.experimental.*
Roman Elizarov1216e912017-02-22 09:57:06 +030020import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030021import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarove4b6f092018-03-06 13:37:42 +030022import kotlinx.coroutines.experimental.intrinsics.*
23import kotlinx.coroutines.experimental.selects.*
24import kotlin.coroutines.experimental.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030025
26/**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030027 * Abstract send channel. It is a base class for all send channel implementations.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030028 */
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030029public abstract class AbstractSendChannel<E> : SendChannel<E> {
30 /** @suppress **This is unstable API and it is subject to change.** */
31 protected val queue = LockFreeLinkedListHead()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030032
33 // ------ extension points for buffered channels ------
34
Roman Elizarovf138bbc2017-02-09 19:13:08 +030035 /**
Roman Elizarov2ad0e942017-02-28 19:14:08 +030036 * Returns `true` if [isBufferFull] is always `true`.
37 * @suppress **This is unstable API and it is subject to change.**
38 */
39 protected abstract val isBufferAlwaysFull: Boolean
40
41 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030042 * Returns `true` if this channel's buffer is full.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030043 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030044 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030045 protected abstract val isBufferFull: Boolean
46
Roman Elizarov2ad0e942017-02-28 19:14:08 +030047 // ------ internal functions for override by buffered channels ------
48
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030049 /**
50 * Tries to add element to buffer or to queued receiver.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030051 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030052 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030053 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030054 protected open fun offerInternal(element: E): Any {
55 while (true) {
56 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
57 val token = receive.tryResumeReceive(element, idempotent = null)
58 if (token != null) {
59 receive.completeResumeReceive(token)
60 return receive.offerResult
61 }
62 }
63 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030064
65 /**
Roman Elizarov1216e912017-02-22 09:57:06 +030066 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
67 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030068 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov1216e912017-02-22 09:57:06 +030069 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030070 protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
71 // offer atomically with select
72 val offerOp = describeTryOffer(element)
73 val failure = select.performAtomicTrySelect(offerOp)
74 if (failure != null) return failure
75 val receive = offerOp.result
76 receive.completeResumeReceive(offerOp.resumeToken!!)
77 return receive.offerResult
78 }
Roman Elizarov1216e912017-02-22 09:57:06 +030079
Roman Elizarov2ad0e942017-02-28 19:14:08 +030080 // ------ state functions & helpers for concrete implementations ------
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030081
Roman Elizarovf138bbc2017-02-09 19:13:08 +030082 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030083 * Returns non-null closed token if it is last in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030084 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030085 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +030086 protected val closedForSend: Closed<*>? get() = queue.prevNode as? Closed<*>
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030087
Roman Elizarov2ad0e942017-02-28 19:14:08 +030088 /**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030089 * Returns non-null closed token if it is first in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030090 * @suppress **This is unstable API and it is subject to change.**
91 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +030092 protected val closedForReceive: Closed<*>? get() = queue.nextNode as? Closed<*>
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030093
94 /**
95 * Retrieves first sending waiter from the queue or returns closed token.
96 * @suppress **This is unstable API and it is subject to change.**
97 */
98 protected fun takeFirstSendOrPeekClosed(): Send? =
99 queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300100
101 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300102 * Queues buffered element, returns null on success or
103 * returns node reference if it was already closed or is waiting for receive.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300104 * @suppress **This is unstable API and it is subject to change.**
105 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300106 protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
107 queue.addLastIfPrev(SendBuffered(element), { prev ->
108 if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
109 true
110 })
111 return null
112 }
113
114 /**
115 * Queues conflated element, returns null on success or
116 * returns node reference if it was already closed or is waiting for receive.
117 * @suppress **This is unstable API and it is subject to change.**
118 */
119 protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
120 val node = SendBuffered(element)
121 queue.addLastIfPrev(node, { prev ->
122 if (prev is ReceiveOrClosed<*>) return@sendConflated prev
123 true
124 })
125 conflatePreviousSendBuffered(node)
126 return null
127 }
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300128
129 /**
130 * @suppress **This is unstable API and it is subject to change.**
131 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300132 protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300133 val prev = node.prevNode
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300134 (prev as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300135 }
136
137 /**
138 * @suppress **This is unstable API and it is subject to change.**
139 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300140 protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
141
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300142 private open class SendBufferedDesc<E>(
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300143 queue: LockFreeLinkedListHead,
144 element: E
145 ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
146 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
147 if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
148 return null
149 }
150 }
151
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300152 /**
153 * @suppress **This is unstable API and it is subject to change.**
154 */
155 protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
156
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300157 private class SendConflatedDesc<E>(
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300158 queue: LockFreeLinkedListHead,
159 element: E
160 ) : SendBufferedDesc<E>(queue, element) {
161 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
162 super.finishOnSuccess(affected, next)
163 // remove previous SendBuffered
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300164 (affected as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300165 }
166 }
167
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300168 // ------ SendChannel ------
169
Roman Elizarov0a788392017-02-15 17:52:12 +0300170 public final override val isClosedForSend: Boolean get() = closedForSend != null
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300171 public final override val isFull: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300172
Roman Elizarov0a788392017-02-15 17:52:12 +0300173 public final override suspend fun send(element: E) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300174 // fast path -- try offer non-blocking
175 if (offer(element)) return
176 // slow-path does suspend
177 return sendSuspend(element)
178 }
179
Roman Elizarov0a788392017-02-15 17:52:12 +0300180 public final override fun offer(element: E): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300181 val result = offerInternal(element)
182 return when {
183 result === OFFER_SUCCESS -> true
Roman Elizarov1216e912017-02-22 09:57:06 +0300184 result === OFFER_FAILED -> false
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300185 result is Closed<*> -> throw result.sendException
Roman Elizarov1216e912017-02-22 09:57:06 +0300186 else -> error("offerInternal returned $result")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300187 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300188 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300189
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300190 private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300191 val send = SendElement(element, cont)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300192 loop@ while (true) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300193 val enqueueResult = enqueueSend(send)
194 when (enqueueResult) {
195 null -> { // enqueued successfully
196 cont.initCancellability() // make it properly cancellable
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300197 cont.removeOnCancellation(send)
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300198 return@sc
199 }
200 is Closed<*> -> {
201 cont.resumeWithException(enqueueResult.sendException)
202 return@sc
203 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300204 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300205 // hm... receiver is waiting or buffer is not full. try to offer
206 val offerResult = offerInternal(element)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300207 when {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300208 offerResult === OFFER_SUCCESS -> {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300209 cont.resume(Unit)
210 return@sc
211 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300212 offerResult === OFFER_FAILED -> continue@loop
213 offerResult is Closed<*> -> {
214 cont.resumeWithException(offerResult.sendException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300215 return@sc
216 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300217 else -> error("offerInternal returned $offerResult")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300218 }
219 }
220 }
221
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300222 /**
223 * Result is:
224 * * null -- successfully enqueued
225 * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
226 * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
227 */
228 private fun enqueueSend(send: SendElement): Any? {
229 if (isBufferAlwaysFull) {
230 queue.addLastIfPrev(send, { prev ->
231 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
232 true
233 })
234 } else {
235 if (!queue.addLastIfPrevAndIf(send, { prev ->
236 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
237 true
238 }, { isBufferFull }))
239 return ENQUEUE_FAILED
240 }
241 return null
242 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300243
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300244 public override fun close(cause: Throwable?): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300245 val closed = Closed<E>(cause)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300246 while (true) {
247 val receive = takeFirstReceiveOrPeekClosed()
248 if (receive == null) {
249 // queue empty or has only senders -- try add last "Closed" item to the queue
Roman Elizarov98b7a6e2017-06-07 08:43:43 +0300250 if (queue.addLastIfPrev(closed, { prev ->
251 if (prev is Closed<*>) return false // already closed
252 prev !is ReceiveOrClosed<*> // only add close if no waiting receive
253 })) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300254 onClosed(closed)
Roman Elizarov0a788392017-02-15 17:52:12 +0300255 afterClose(cause)
256 return true
257 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300258 continue // retry on failure
259 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300260 if (receive is Closed<*>) return false // already marked as closed -- nothing to do
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300261 receive as Receive<E> // type assertion
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300262 receive.resumeReceiveClosed(closed)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300263 }
264 }
265
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300266 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300267 * Invoked when [Closed] element was just added.
268 * @suppress **This is unstable API and it is subject to change.**
269 */
270 protected open fun onClosed(closed: Closed<E>) {}
271
272 /**
Roman Elizarov0a788392017-02-15 17:52:12 +0300273 * Invoked after successful [close].
274 */
275 protected open fun afterClose(cause: Throwable?) {}
276
277 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300278 * Retrieves first receiving waiter from the queue or returns closed token.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300279 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300280 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300281 protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
Roman Elizarov1216e912017-02-22 09:57:06 +0300282 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
283
284 // ------ registerSelectSend ------
285
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300286 /**
287 * @suppress **This is unstable API and it is subject to change.**
288 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300289 protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
290
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300291 /**
292 * @suppress **This is unstable API and it is subject to change.**
293 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300294 protected class TryOfferDesc<E>(
295 @JvmField val element: E,
296 queue: LockFreeLinkedListHead
297 ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
298 @JvmField var resumeToken: Any? = null
299
300 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
301 if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
302 if (affected is Closed<*>) return affected
303 return null
304 }
305
306 override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
307 val token = node.tryResumeReceive(element, idempotent = this) ?: return false
308 resumeToken = token
309 return true
310 }
311 }
312
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300313 private inner class TryEnqueueSendDesc<R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300314 element: E,
315 select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300316 block: suspend (SendChannel<E>) -> R
317 ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300318 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
319 if (affected is ReceiveOrClosed<*>) {
320 return affected as? Closed<*> ?: ENQUEUE_FAILED
321 }
322 return null
323 }
324
325 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
326 if (!isBufferFull) return ENQUEUE_FAILED
327 return super.onPrepare(affected, next)
328 }
329
330 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
331 super.finishOnSuccess(affected, next)
332 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300333 node.disposeOnSelect()
Roman Elizarov1216e912017-02-22 09:57:06 +0300334 }
335 }
336
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300337 final override val onSend: SelectClause2<E, SendChannel<E>>
338 get() = object : SelectClause2<E, SendChannel<E>> {
339 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
340 registerSelectSend(select, param, block)
341 }
342 }
343
344 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300345 while (true) {
346 if (select.isSelected) return
347 if (isFull) {
348 val enqueueOp = TryEnqueueSendDesc(element, select, block)
349 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
350 when {
351 enqueueResult === ALREADY_SELECTED -> return
352 enqueueResult === ENQUEUE_FAILED -> {} // retry
353 enqueueResult is Closed<*> -> throw enqueueResult.sendException
354 else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
355 }
356 } else {
357 val offerResult = offerSelectInternal(element, select)
358 when {
359 offerResult === ALREADY_SELECTED -> return
360 offerResult === OFFER_FAILED -> {} // retry
361 offerResult === OFFER_SUCCESS -> {
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300362 block.startCoroutineUndispatched(receiver = this, completion = select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300363 return
364 }
365 offerResult is Closed<*> -> throw offerResult.sendException
366 else -> error("offerSelectInternal returned $offerResult")
367 }
368 }
369 }
370 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300371
Roman Elizarove4b6f092018-03-06 13:37:42 +0300372 // ------ debug ------
373
374 public override fun toString() =
375 "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
376
377 private val queueDebugStateString: String
378 get() {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300379 val head = queue.nextNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300380 if (head === queue) return "EmptyQueue"
381 var result = when (head) {
382 is Closed<*> -> head.toString()
383 is Receive<*> -> "ReceiveQueued"
384 is Send -> "SendQueued"
385 else -> "UNEXPECTED:$head" // should not happen
386 }
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300387 val tail = queue.prevNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300388 if (tail !== head) {
389 result += ",queueSize=${countQueueSize()}"
390 if (tail is Closed<*>) result += ",closedForSend=$tail"
391 }
392 return result
393 }
394
395 private fun countQueueSize(): Int {
396 var size = 0
397 queue.forEach<LockFreeLinkedListNode> { size++ }
398 return size
399 }
400
401 protected open val bufferDebugString: String get() = ""
402
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300403 // ------ private ------
404
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300405 private class SendSelect<E, R>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300406 override val pollResult: Any?,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300407 @JvmField val channel: SendChannel<E>,
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300408 @JvmField val select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300409 @JvmField val block: suspend (SendChannel<E>) -> R
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300410 ) : LockFreeLinkedListNode(), Send, DisposableHandle {
411 override fun tryResumeSend(idempotent: Any?): Any? =
412 if (select.trySelect(idempotent)) SELECT_STARTED else null
413
414 override fun completeResumeSend(token: Any) {
415 check(token === SELECT_STARTED)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300416 block.startCoroutine(receiver = channel, completion = select.completion)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300417 }
418
419 fun disposeOnSelect() {
420 select.disposeOnSelect(this)
421 }
422
423 override fun dispose() {
424 remove()
425 }
426
Roman Elizarovb555d912017-08-17 21:01:33 +0300427 override fun resumeSendClosed(closed: Closed<*>) {
428 if (select.trySelect(null))
429 select.resumeSelectCancellableWithException(closed.sendException)
430 }
431
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300432 override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300433 }
434
435 private class SendBuffered<out E>(
436 @JvmField val element: E
437 ) : LockFreeLinkedListNode(), Send {
438 override val pollResult: Any? get() = element
439 override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
440 override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +0300441 override fun resumeSendClosed(closed: Closed<*>) {}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300442 }
443}
444
445/**
446 * Abstract send/receive channel. It is a base class for all channel implementations.
447 */
448public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
449 // ------ extension points for buffered channels ------
450
451 /**
452 * Returns `true` if [isBufferEmpty] is always `true`.
453 * @suppress **This is unstable API and it is subject to change.**
454 */
455 protected abstract val isBufferAlwaysEmpty: Boolean
456
457 /**
458 * Returns `true` if this channel's buffer is empty.
459 * @suppress **This is unstable API and it is subject to change.**
460 */
461 protected abstract val isBufferEmpty: Boolean
462
463 // ------ internal functions for override by buffered channels ------
464
465 /**
466 * Tries to remove element from buffer or from queued sender.
467 * Return type is `E | POLL_FAILED | Closed`
468 * @suppress **This is unstable API and it is subject to change.**
469 */
470 protected open fun pollInternal(): Any? {
471 while (true) {
472 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
473 val token = send.tryResumeSend(idempotent = null)
474 if (token != null) {
475 send.completeResumeSend(token)
476 return send.pollResult
477 }
478 }
479 }
480
481 /**
482 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
483 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
484 * @suppress **This is unstable API and it is subject to change.**
485 */
486 protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
487 // poll atomically with select
488 val pollOp = describeTryPoll()
489 val failure = select.performAtomicTrySelect(pollOp)
490 if (failure != null) return failure
491 val send = pollOp.result
492 send.completeResumeSend(pollOp.resumeToken!!)
493 return pollOp.pollResult
494 }
495
496 // ------ state functions & helpers for concrete implementations ------
497
498 /**
499 * @suppress **This is unstable API and it is subject to change.**
500 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300501 protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300502
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300503 // ------ ReceiveChannel ------
504
Roman Elizarov0a788392017-02-15 17:52:12 +0300505 public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300506 public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300507
508 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300509 public final override suspend fun receive(): E {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300510 // fast path -- try poll non-blocking
511 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300512 if (result !== POLL_FAILED) return receiveResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300513 // slow-path does suspend
514 return receiveSuspend()
515 }
516
517 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300518 private fun receiveResult(result: Any?): E {
519 if (result is Closed<*>) throw result.receiveException
520 return result as E
521 }
522
523 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300524 private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300525 val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300526 while (true) {
527 if (enqueueReceive(receive)) {
528 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300529 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300530 return@sc
531 }
532 // hm... something is not right. try to poll
533 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300534 if (result is Closed<*>) {
535 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300536 return@sc
537 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300538 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300539 cont.resume(result as E)
540 return@sc
541 }
542 }
543 }
544
Roman Elizarov0a788392017-02-15 17:52:12 +0300545 private fun enqueueReceive(receive: Receive<E>): Boolean {
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300546 val result = if (isBufferAlwaysEmpty)
547 queue.addLastIfPrev(receive, { it !is Send }) else
548 queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
Roman Elizarov3e342e32018-01-13 20:05:51 +0300549 if (result) onReceiveEnqueued()
Roman Elizarov0a788392017-02-15 17:52:12 +0300550 return result
551 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300552
553 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300554 public final override suspend fun receiveOrNull(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300555 // fast path -- try poll non-blocking
556 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300557 if (result !== POLL_FAILED) return receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300558 // slow-path does suspend
559 return receiveOrNullSuspend()
560 }
561
562 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300563 private fun receiveOrNullResult(result: Any?): E? {
564 if (result is Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300565 if (result.closeCause != null) throw result.closeCause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300566 return null
567 }
568 return result as E
569 }
570
571 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300572 private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300573 val receive = ReceiveElement(cont, nullOnClose = true)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300574 while (true) {
575 if (enqueueReceive(receive)) {
576 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300577 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300578 return@sc
579 }
580 // hm... something is not right. try to poll
581 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300582 if (result is Closed<*>) {
583 if (result.closeCause == null)
584 cont.resume(null)
585 else
Roman Elizarov1216e912017-02-22 09:57:06 +0300586 cont.resumeWithException(result.closeCause)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300587 return@sc
588 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300589 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300590 cont.resume(result as E)
591 return@sc
592 }
593 }
594 }
595
596 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300597 public final override fun poll(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300598 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300599 return if (result === POLL_FAILED) null else receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300600 }
601
Roman Elizarovb555d912017-08-17 21:01:33 +0300602 override fun cancel(cause: Throwable?): Boolean =
603 close(cause).also {
604 cleanupSendQueueOnCancel()
605 }
606
607 // Note: this function is invoked when channel is already closed
608 protected open fun cleanupSendQueueOnCancel() {
609 val closed = closedForSend ?: error("Cannot happen")
610 while (true) {
611 val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
612 if (send is Closed<*>) {
613 check(send === closed)
614 return // cleaned
615 }
616 send.resumeSendClosed(closed)
617 }
618 }
619
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300620 public final override fun iterator(): ChannelIterator<E> = Itr(this)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300621
Roman Elizarov1216e912017-02-22 09:57:06 +0300622 // ------ registerSelectReceive ------
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300623
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300624 /**
625 * @suppress **This is unstable API and it is subject to change.**
626 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300627 protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300628
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300629 /**
630 * @suppress **This is unstable API and it is subject to change.**
631 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300632 protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
633 @JvmField var resumeToken: Any? = null
634 @JvmField var pollResult: E? = null
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300635
Roman Elizarov1216e912017-02-22 09:57:06 +0300636 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
637 if (affected is Closed<*>) return affected
638 if (affected !is Send) return POLL_FAILED
639 return null
Roman Elizarov0a788392017-02-15 17:52:12 +0300640 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300641
642 @Suppress("UNCHECKED_CAST")
643 override fun validatePrepared(node: Send): Boolean {
Roman Elizarov932e8602017-06-21 17:21:37 +0300644 val token = node.tryResumeSend(idempotent = this) ?: return false
Roman Elizarov1216e912017-02-22 09:57:06 +0300645 resumeToken = token
646 pollResult = node.pollResult as E
647 return true
648 }
649 }
650
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300651 private inner class TryEnqueueReceiveDesc<E, R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300652 select: SelectInstance<R>,
653 block: suspend (E?) -> R,
654 nullOnClose: Boolean
Roman Elizarov174c6962017-02-28 17:36:51 +0300655 ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300656 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
657 if (affected is Send) return ENQUEUE_FAILED
658 return null
659 }
660
661 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
662 if (!isBufferEmpty) return ENQUEUE_FAILED
663 return super.onPrepare(affected, next)
664 }
665
666 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
667 super.finishOnSuccess(affected, next)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300668 // notify the there is one more receiver
Roman Elizarov3e342e32018-01-13 20:05:51 +0300669 onReceiveEnqueued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300670 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300671 node.removeOnSelectCompletion()
Roman Elizarov1216e912017-02-22 09:57:06 +0300672 }
673 }
674
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300675 final override val onReceive: SelectClause1<E>
676 get() = object : SelectClause1<E> {
677 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
678 registerSelectReceive(select, block)
679 }
680 }
681
Roman Elizarov1216e912017-02-22 09:57:06 +0300682 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300683 private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300684 while (true) {
685 if (select.isSelected) return
686 if (isEmpty) {
687 val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
688 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
689 when {
690 enqueueResult === ALREADY_SELECTED -> return
691 enqueueResult === ENQUEUE_FAILED -> {} // retry
692 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
693 }
694 } else {
695 val pollResult = pollSelectInternal(select)
696 when {
697 pollResult === ALREADY_SELECTED -> return
698 pollResult === POLL_FAILED -> {} // retry
699 pollResult is Closed<*> -> throw pollResult.receiveException
700 else -> {
Roman Elizarov4638d792017-03-14 19:39:26 +0300701 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300702 return
703 }
704 }
705 }
706 }
707 }
708
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300709 final override val onReceiveOrNull: SelectClause1<E?>
710 get() = object : SelectClause1<E?> {
711 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
712 registerSelectReceiveOrNull(select, block)
713 }
714 }
715
Roman Elizarov1216e912017-02-22 09:57:06 +0300716 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300717 private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300718 while (true) {
719 if (select.isSelected) return
720 if (isEmpty) {
721 val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
722 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
723 when {
724 enqueueResult === ALREADY_SELECTED -> return
725 enqueueResult === ENQUEUE_FAILED -> {} // retry
726 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
727 }
728 } else {
729 val pollResult = pollSelectInternal(select)
730 when {
731 pollResult === ALREADY_SELECTED -> return
732 pollResult === POLL_FAILED -> {} // retry
733 pollResult is Closed<*> -> {
734 if (pollResult.closeCause == null) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300735 if (select.trySelect(null))
Roman Elizarov4638d792017-03-14 19:39:26 +0300736 block.startCoroutineUndispatched(null, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300737 return
738 } else
739 throw pollResult.closeCause
740 }
741 else -> {
742 // selected successfully
Roman Elizarov4638d792017-03-14 19:39:26 +0300743 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300744 return
745 }
746 }
747 }
748 }
749 }
750
751 // ------ protected ------
752
Roman Elizarov3e342e32018-01-13 20:05:51 +0300753 override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
754 super.takeFirstReceiveOrPeekClosed().also {
755 if (it != null && it !is Closed<*>) onReceiveDequeued()
756 }
Roman Elizarov0a788392017-02-15 17:52:12 +0300757
758 /**
Roman Elizarov3e342e32018-01-13 20:05:51 +0300759 * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
760 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov0a788392017-02-15 17:52:12 +0300761 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300762 protected open fun onReceiveEnqueued() {}
763
764 /**
765 * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
766 * @suppress **This is unstable API and it is subject to change.**
767 */
768 protected open fun onReceiveDequeued() {}
Roman Elizarov0a788392017-02-15 17:52:12 +0300769
Roman Elizarov1216e912017-02-22 09:57:06 +0300770 // ------ private ------
771
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300772 private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
773 cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
774
775 private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
776 override fun invoke(cause: Throwable?) {
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300777 if (receive.remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300778 onReceiveDequeued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300779 }
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300780 override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
Roman Elizarov1216e912017-02-22 09:57:06 +0300781 }
782
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300783 private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
Roman Elizarov1216e912017-02-22 09:57:06 +0300784 var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300785
Roman Elizarov3e342e32018-01-13 20:05:51 +0300786 override suspend fun hasNext(): Boolean {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300787 // check for repeated hasNext
Roman Elizarov1216e912017-02-22 09:57:06 +0300788 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300789 // fast path -- try poll non-blocking
790 result = channel.pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300791 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300792 // slow-path does suspend
793 return hasNextSuspend()
794 }
795
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300796 private fun hasNextResult(result: Any?): Boolean {
797 if (result is Closed<*>) {
798 if (result.closeCause != null) throw result.receiveException
799 return false
800 }
801 return true
802 }
803
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300804 private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300805 val receive = ReceiveHasNext(this, cont)
806 while (true) {
807 if (channel.enqueueReceive(receive)) {
808 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300809 channel.removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300810 return@sc
811 }
812 // hm... something is not right. try to poll
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300813 val result = channel.pollInternal()
814 this.result = result
815 if (result is Closed<*>) {
816 if (result.closeCause == null)
817 cont.resume(false)
818 else
819 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300820 return@sc
821 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300822 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300823 cont.resume(true)
824 return@sc
825 }
826 }
827 }
828
829 @Suppress("UNCHECKED_CAST")
Roman Elizarov3e342e32018-01-13 20:05:51 +0300830 override suspend fun next(): E {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300831 val result = this.result
832 if (result is Closed<*>) throw result.receiveException
Roman Elizarov1216e912017-02-22 09:57:06 +0300833 if (result !== POLL_FAILED) {
834 this.result = POLL_FAILED
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300835 return result as E
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300836 }
837 // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
838 return channel.receive()
839 }
840 }
841
Roman Elizarov1216e912017-02-22 09:57:06 +0300842 private class ReceiveElement<in E>(
843 @JvmField val cont: CancellableContinuation<E?>,
844 @JvmField val nullOnClose: Boolean
845 ) : Receive<E>() {
846 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300847 override fun completeResumeReceive(token: Any) = cont.completeResume(token)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300848 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300849 if (closed.closeCause == null && nullOnClose)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300850 cont.resume(null)
851 else
852 cont.resumeWithException(closed.receiveException)
853 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300854 override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300855 }
856
857 private class ReceiveHasNext<E>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300858 @JvmField val iterator: Itr<E>,
Roman Elizarov1216e912017-02-22 09:57:06 +0300859 @JvmField val cont: CancellableContinuation<Boolean>
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300860 ) : Receive<E>() {
Roman Elizarov1216e912017-02-22 09:57:06 +0300861 override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
862 val token = cont.tryResume(true, idempotent)
863 if (token != null) {
864 /*
865 When idempotent != null this invocation can be stale and we cannot directly update iterator.result
866 Instead, we save both token & result into a temporary IdempotentTokenValue object and
867 set iterator result only in completeResumeReceive that is going to be invoked just once
868 */
869 if (idempotent != null) return IdempotentTokenValue(token, value)
870 iterator.result = value
871 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300872 return token
873 }
874
Roman Elizarov1216e912017-02-22 09:57:06 +0300875 override fun completeResumeReceive(token: Any) {
876 if (token is IdempotentTokenValue<*>) {
877 iterator.result = token.value
878 cont.completeResume(token.token)
879 } else
880 cont.completeResume(token)
881 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300882
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300883 override fun resumeReceiveClosed(closed: Closed<*>) {
884 val token = if (closed.closeCause == null)
885 cont.tryResume(false)
886 else
887 cont.tryResumeWithException(closed.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300888 if (token != null) {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300889 iterator.result = closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300890 cont.completeResume(token)
891 }
892 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300893 override fun toString(): String = "ReceiveHasNext[$cont]"
894 }
895
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300896 private inner class ReceiveSelect<R, in E>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300897 @JvmField val select: SelectInstance<R>,
898 @JvmField val block: suspend (E?) -> R,
899 @JvmField val nullOnClose: Boolean
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300900 ) : Receive<E>(), DisposableHandle {
Roman Elizarov1216e912017-02-22 09:57:06 +0300901 override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
902 if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
903
904 @Suppress("UNCHECKED_CAST")
905 override fun completeResumeReceive(token: Any) {
906 val value: E = (if (token === NULL_VALUE) null else token) as E
907 block.startCoroutine(value, select.completion)
908 }
909
910 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300911 if (select.trySelect(null)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300912 if (closed.closeCause == null && nullOnClose) {
913 block.startCoroutine(null, select.completion)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300914 } else {
Roman Elizarov932e8602017-06-21 17:21:37 +0300915 // even though we are dispatching coroutine to process channel close on receive,
916 // which is an atomically cancellable suspending function,
917 // close is a final state, so we can use a cancellable resume mode
918 select.resumeSelectCancellableWithException(closed.receiveException)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300919 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300920 }
921 }
922
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300923 fun removeOnSelectCompletion() {
924 select.disposeOnSelect(this)
925 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300926
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300927 override fun dispose() { // invoked on select completion
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300928 if (remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300929 onReceiveDequeued() // notify cancellation of receive
Roman Elizarov1216e912017-02-22 09:57:06 +0300930 }
931
932 override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300933 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300934
935 private class IdempotentTokenValue<out E>(
936 @JvmField val token: Any,
937 @JvmField val value: E
938 )
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300939}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300940
941/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300942@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300943
944/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300945@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300946
947/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300948@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300949
950/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300951@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300952
953/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300954@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300955
956/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300957@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300958
959/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300960@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300961
962/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300963@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300964
965/**
966 * Represents sending waiter in the queue.
967 * @suppress **This is unstable API and it is subject to change.**
968 */
969public interface Send {
970 val pollResult: Any? // E | Closed
971 fun tryResumeSend(idempotent: Any?): Any?
972 fun completeResumeSend(token: Any)
Roman Elizarovb555d912017-08-17 21:01:33 +0300973 fun resumeSendClosed(closed: Closed<*>)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300974}
975
976/**
977 * Represents receiver waiter in the queue or closed token.
978 * @suppress **This is unstable API and it is subject to change.**
979 */
980public interface ReceiveOrClosed<in E> {
981 val offerResult: Any // OFFER_SUCCESS | Closed
982 fun tryResumeReceive(value: E, idempotent: Any?): Any?
983 fun completeResumeReceive(token: Any)
984}
985
986/**
Roman Elizarovb555d912017-08-17 21:01:33 +0300987 * Represents sender for a specific element.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300988 * @suppress **This is unstable API and it is subject to change.**
989 */
990@Suppress("UNCHECKED_CAST")
991public class SendElement(
992 override val pollResult: Any?,
993 @JvmField val cont: CancellableContinuation<Unit>
994) : LockFreeLinkedListNode(), Send {
995 override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
996 override fun completeResumeSend(token: Any) = cont.completeResume(token)
Roman Elizarovb555d912017-08-17 21:01:33 +0300997 override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300998 override fun toString(): String = "SendElement($pollResult)[$cont]"
999}
1000
1001/**
1002 * Represents closed channel.
1003 * @suppress **This is unstable API and it is subject to change.**
1004 */
1005public class Closed<in E>(
1006 @JvmField val closeCause: Throwable?
1007) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
1008 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1009 val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1010
1011 override val offerResult get() = this
1012 override val pollResult get() = this
1013 override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
1014 override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarove6e8ce82017-06-05 17:04:39 +03001015 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
1016 override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +03001017 override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001018 override fun toString(): String = "Closed[$closeCause]"
1019}
1020
1021private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1022 override val offerResult get() = OFFER_SUCCESS
1023 abstract fun resumeReceiveClosed(closed: Closed<*>)
1024}
1025