blob: 3392b705744c14555b3978e394d59b47f7d66e9d [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03005package kotlinx.coroutines.experimental.channels
6
Vsevolod Tolstopyatov732474f2018-07-20 11:36:20 +03007import kotlinx.atomicfu.*
Roman Elizarove4b6f092018-03-06 13:37:42 +03008import kotlinx.coroutines.experimental.*
Roman Elizarov1216e912017-02-22 09:57:06 +03009import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +030010import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarove4b6f092018-03-06 13:37:42 +030011import kotlinx.coroutines.experimental.intrinsics.*
12import kotlinx.coroutines.experimental.selects.*
13import kotlin.coroutines.experimental.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030014
15/**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030016 * Abstract send channel. It is a base class for all send channel implementations.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030017 */
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030018public abstract class AbstractSendChannel<E> : SendChannel<E> {
19 /** @suppress **This is unstable API and it is subject to change.** */
20 protected val queue = LockFreeLinkedListHead()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030021
22 // ------ extension points for buffered channels ------
23
Roman Elizarovf138bbc2017-02-09 19:13:08 +030024 /**
Roman Elizarov2ad0e942017-02-28 19:14:08 +030025 * Returns `true` if [isBufferFull] is always `true`.
26 * @suppress **This is unstable API and it is subject to change.**
27 */
28 protected abstract val isBufferAlwaysFull: Boolean
29
30 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030031 * Returns `true` if this channel's buffer is full.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030032 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030033 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030034 protected abstract val isBufferFull: Boolean
35
Vsevolod Tolstopyatov732474f2018-07-20 11:36:20 +030036 // State transitions: null -> handler -> HANDLER_INVOKED
37 private val onCloseHandler = atomic<Any?>(null)
38
Roman Elizarov2ad0e942017-02-28 19:14:08 +030039 // ------ internal functions for override by buffered channels ------
40
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041 /**
42 * Tries to add element to buffer or to queued receiver.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030043 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030044 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030045 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030046 protected open fun offerInternal(element: E): Any {
47 while (true) {
48 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
49 val token = receive.tryResumeReceive(element, idempotent = null)
50 if (token != null) {
51 receive.completeResumeReceive(token)
52 return receive.offerResult
53 }
54 }
55 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030056
57 /**
Roman Elizarov1216e912017-02-22 09:57:06 +030058 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030060 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov1216e912017-02-22 09:57:06 +030061 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030062 protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
63 // offer atomically with select
64 val offerOp = describeTryOffer(element)
65 val failure = select.performAtomicTrySelect(offerOp)
66 if (failure != null) return failure
67 val receive = offerOp.result
68 receive.completeResumeReceive(offerOp.resumeToken!!)
69 return receive.offerResult
70 }
Roman Elizarov1216e912017-02-22 09:57:06 +030071
Roman Elizarov2ad0e942017-02-28 19:14:08 +030072 // ------ state functions & helpers for concrete implementations ------
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030073
Roman Elizarovf138bbc2017-02-09 19:13:08 +030074 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030075 * Returns non-null closed token if it is last in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030076 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030077 */
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +030078 protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030079
Roman Elizarov2ad0e942017-02-28 19:14:08 +030080 /**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030081 * Returns non-null closed token if it is first in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030082 * @suppress **This is unstable API and it is subject to change.**
83 */
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +030084 protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030085
86 /**
87 * Retrieves first sending waiter from the queue or returns closed token.
88 * @suppress **This is unstable API and it is subject to change.**
89 */
90 protected fun takeFirstSendOrPeekClosed(): Send? =
91 queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
Roman Elizarov2ad0e942017-02-28 19:14:08 +030092
93 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +030094 * Queues buffered element, returns null on success or
95 * returns node reference if it was already closed or is waiting for receive.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030096 * @suppress **This is unstable API and it is subject to change.**
97 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +030098 protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
99 queue.addLastIfPrev(SendBuffered(element), { prev ->
100 if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
101 true
102 })
103 return null
104 }
105
106 /**
107 * Queues conflated element, returns null on success or
108 * returns node reference if it was already closed or is waiting for receive.
109 * @suppress **This is unstable API and it is subject to change.**
110 */
111 protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
112 val node = SendBuffered(element)
113 queue.addLastIfPrev(node, { prev ->
114 if (prev is ReceiveOrClosed<*>) return@sendConflated prev
115 true
116 })
117 conflatePreviousSendBuffered(node)
118 return null
119 }
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300120
121 /**
122 * @suppress **This is unstable API and it is subject to change.**
123 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300124 protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300125 val prev = node.prevNode
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300126 (prev as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300127 }
128
129 /**
130 * @suppress **This is unstable API and it is subject to change.**
131 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300132 protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
133
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300134 private open class SendBufferedDesc<E>(
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300135 queue: LockFreeLinkedListHead,
136 element: E
137 ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
138 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
139 if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
140 return null
141 }
142 }
143
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300144 /**
145 * @suppress **This is unstable API and it is subject to change.**
146 */
147 protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
148
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300149 private class SendConflatedDesc<E>(
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300150 queue: LockFreeLinkedListHead,
151 element: E
152 ) : SendBufferedDesc<E>(queue, element) {
153 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
154 super.finishOnSuccess(affected, next)
155 // remove previous SendBuffered
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300156 (affected as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300157 }
158 }
159
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300160 // ------ SendChannel ------
161
Roman Elizarov0a788392017-02-15 17:52:12 +0300162 public final override val isClosedForSend: Boolean get() = closedForSend != null
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300163 public final override val isFull: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300164
Roman Elizarov0a788392017-02-15 17:52:12 +0300165 public final override suspend fun send(element: E) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300166 // fast path -- try offer non-blocking
167 if (offer(element)) return
168 // slow-path does suspend
169 return sendSuspend(element)
170 }
171
Roman Elizarov0a788392017-02-15 17:52:12 +0300172 public final override fun offer(element: E): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300173 val result = offerInternal(element)
174 return when {
175 result === OFFER_SUCCESS -> true
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300176 // We should check for closed token on offer as well, otherwise offer won't be linearizable
177 // in the face of concurrent close()
178 result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300179 result is Closed<*> -> throw result.sendException
Roman Elizarov1216e912017-02-22 09:57:06 +0300180 else -> error("offerInternal returned $result")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300181 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300182 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300183
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300184 private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300185 val send = SendElement(element, cont)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300186 loop@ while (true) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300187 val enqueueResult = enqueueSend(send)
188 when (enqueueResult) {
189 null -> { // enqueued successfully
190 cont.initCancellability() // make it properly cancellable
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300191 cont.removeOnCancellation(send)
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300192 return@sc
193 }
194 is Closed<*> -> {
Vsevolod Tolstopyatov7bd983f2018-07-25 12:36:23 +0300195 helpClose(enqueueResult)
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300196 cont.resumeWithException(enqueueResult.sendException)
197 return@sc
198 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300199 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300200 // hm... receiver is waiting or buffer is not full. try to offer
201 val offerResult = offerInternal(element)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300202 when {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300203 offerResult === OFFER_SUCCESS -> {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300204 cont.resume(Unit)
205 return@sc
206 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300207 offerResult === OFFER_FAILED -> continue@loop
208 offerResult is Closed<*> -> {
Vsevolod Tolstopyatov7bd983f2018-07-25 12:36:23 +0300209 helpClose(offerResult)
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300210 cont.resumeWithException(offerResult.sendException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300211 return@sc
212 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300213 else -> error("offerInternal returned $offerResult")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300214 }
215 }
216 }
217
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300218 /**
219 * Result is:
220 * * null -- successfully enqueued
221 * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
222 * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
223 */
224 private fun enqueueSend(send: SendElement): Any? {
225 if (isBufferAlwaysFull) {
226 queue.addLastIfPrev(send, { prev ->
227 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
228 true
229 })
230 } else {
231 if (!queue.addLastIfPrevAndIf(send, { prev ->
232 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
233 true
234 }, { isBufferFull }))
235 return ENQUEUE_FAILED
236 }
237 return null
238 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300239
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300240 public override fun close(cause: Throwable?): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300241 val closed = Closed<E>(cause)
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300242
243 /*
244 * Try to commit close by adding a close token to the end of the queue.
245 * Successful -> we're now responsible for closing receivers
246 * Not successful -> help closing pending receivers to maintain invariant
247 * "if (!close()) next send will throw"
248 */
249 val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
250 if (!closeAdded) {
251 helpClose(queue.prevNode as Closed<*>)
252 return false
253 }
254
255 helpClose(closed)
Vsevolod Tolstopyatov732474f2018-07-20 11:36:20 +0300256 invokeOnCloseHandler(cause)
257 // TODO We can get rid of afterClose
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300258 onClosed(closed)
259 afterClose(cause)
260 return true
261 }
262
Vsevolod Tolstopyatov732474f2018-07-20 11:36:20 +0300263 private fun invokeOnCloseHandler(cause: Throwable?) {
264 val handler = onCloseHandler.value
265 if (handler !== null && handler !== HANDLER_INVOKED
266 && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
267 // CAS failed -> concurrent invokeOnClose() invoked handler
268 (handler as Handler)(cause)
269 }
270 }
271
272 override fun invokeOnClose(handler: Handler) {
273 // Intricate dance for concurrent invokeOnClose and close calls
274 if (!onCloseHandler.compareAndSet(null, handler)) {
275 val value = onCloseHandler.value
276 if (value === HANDLER_INVOKED) {
277 throw IllegalStateException("Another handler was already registered and successfully invoked")
278 }
279
280 throw IllegalStateException("Another handler was already registered: $value")
281 } else {
282 val closedToken = closedForSend
283 if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
284 // CAS failed -> close() call invoked handler
285 (handler)(closedToken.closeCause)
286 }
287 }
288 }
289
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300290 private fun helpClose(closed: Closed<*>) {
291 /*
292 * It's important to traverse list from right to left to avoid races with sender.
293 * Consider channel state
294 * head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
295 * T1 invokes receive()
296 * T2 invokes close()
297 * T3 invokes close() + send(value)
298 *
299 * If both will traverse list from left to right, following non-linearizable history is possible:
300 * [close -> false], [send -> transferred 'value' to receiver]
301 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300302 while (true) {
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300303 val previous = closed.prevNode
304 // Channel is empty or has no receivers
305 if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
306 break
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300307 }
Vsevolod Tolstopyatovea651aa2018-07-17 18:39:20 +0300308
309 if (!previous.remove()) {
310 // failed to remove the node (due to race) -- retry finding non-removed prevNode
311 // NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
312 previous.helpRemove() // make sure remove is complete before continuing
313 continue
314 }
315
316 @Suppress("UNCHECKED_CAST")
317 previous as Receive<E> // type assertion
318 previous.resumeReceiveClosed(closed)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300319 }
320 }
321
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300322 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300323 * Invoked when [Closed] element was just added.
324 * @suppress **This is unstable API and it is subject to change.**
325 */
326 protected open fun onClosed(closed: Closed<E>) {}
327
328 /**
Roman Elizarov0a788392017-02-15 17:52:12 +0300329 * Invoked after successful [close].
330 */
331 protected open fun afterClose(cause: Throwable?) {}
332
333 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300334 * Retrieves first receiving waiter from the queue or returns closed token.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300335 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300336 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300337 protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
Roman Elizarov1216e912017-02-22 09:57:06 +0300338 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
339
340 // ------ registerSelectSend ------
341
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300342 /**
343 * @suppress **This is unstable API and it is subject to change.**
344 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300345 protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
346
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300347 /**
348 * @suppress **This is unstable API and it is subject to change.**
349 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300350 protected class TryOfferDesc<E>(
351 @JvmField val element: E,
352 queue: LockFreeLinkedListHead
353 ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
354 @JvmField var resumeToken: Any? = null
355
356 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
357 if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
358 if (affected is Closed<*>) return affected
359 return null
360 }
361
362 override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
363 val token = node.tryResumeReceive(element, idempotent = this) ?: return false
364 resumeToken = token
365 return true
366 }
367 }
368
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300369 private inner class TryEnqueueSendDesc<R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300370 element: E,
371 select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300372 block: suspend (SendChannel<E>) -> R
373 ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300374 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
375 if (affected is ReceiveOrClosed<*>) {
376 return affected as? Closed<*> ?: ENQUEUE_FAILED
377 }
378 return null
379 }
380
381 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
382 if (!isBufferFull) return ENQUEUE_FAILED
383 return super.onPrepare(affected, next)
384 }
385
386 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
387 super.finishOnSuccess(affected, next)
388 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300389 node.disposeOnSelect()
Roman Elizarov1216e912017-02-22 09:57:06 +0300390 }
391 }
392
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300393 final override val onSend: SelectClause2<E, SendChannel<E>>
394 get() = object : SelectClause2<E, SendChannel<E>> {
395 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
396 registerSelectSend(select, param, block)
397 }
398 }
399
400 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300401 while (true) {
402 if (select.isSelected) return
403 if (isFull) {
404 val enqueueOp = TryEnqueueSendDesc(element, select, block)
405 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
406 when {
407 enqueueResult === ALREADY_SELECTED -> return
408 enqueueResult === ENQUEUE_FAILED -> {} // retry
409 enqueueResult is Closed<*> -> throw enqueueResult.sendException
410 else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
411 }
412 } else {
413 val offerResult = offerSelectInternal(element, select)
414 when {
415 offerResult === ALREADY_SELECTED -> return
416 offerResult === OFFER_FAILED -> {} // retry
417 offerResult === OFFER_SUCCESS -> {
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300418 block.startCoroutineUndispatched(receiver = this, completion = select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300419 return
420 }
421 offerResult is Closed<*> -> throw offerResult.sendException
422 else -> error("offerSelectInternal returned $offerResult")
423 }
424 }
425 }
426 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300427
Roman Elizarove4b6f092018-03-06 13:37:42 +0300428 // ------ debug ------
429
430 public override fun toString() =
431 "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
432
433 private val queueDebugStateString: String
434 get() {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300435 val head = queue.nextNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300436 if (head === queue) return "EmptyQueue"
437 var result = when (head) {
438 is Closed<*> -> head.toString()
439 is Receive<*> -> "ReceiveQueued"
440 is Send -> "SendQueued"
441 else -> "UNEXPECTED:$head" // should not happen
442 }
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300443 val tail = queue.prevNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300444 if (tail !== head) {
445 result += ",queueSize=${countQueueSize()}"
446 if (tail is Closed<*>) result += ",closedForSend=$tail"
447 }
448 return result
449 }
450
451 private fun countQueueSize(): Int {
452 var size = 0
453 queue.forEach<LockFreeLinkedListNode> { size++ }
454 return size
455 }
456
457 protected open val bufferDebugString: String get() = ""
458
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300459 // ------ private ------
460
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300461 private class SendSelect<E, R>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300462 override val pollResult: Any?,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300463 @JvmField val channel: SendChannel<E>,
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300464 @JvmField val select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300465 @JvmField val block: suspend (SendChannel<E>) -> R
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300466 ) : LockFreeLinkedListNode(), Send, DisposableHandle {
467 override fun tryResumeSend(idempotent: Any?): Any? =
468 if (select.trySelect(idempotent)) SELECT_STARTED else null
469
470 override fun completeResumeSend(token: Any) {
471 check(token === SELECT_STARTED)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300472 block.startCoroutine(receiver = channel, completion = select.completion)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300473 }
474
475 fun disposeOnSelect() {
476 select.disposeOnSelect(this)
477 }
478
479 override fun dispose() {
480 remove()
481 }
482
Roman Elizarovb555d912017-08-17 21:01:33 +0300483 override fun resumeSendClosed(closed: Closed<*>) {
484 if (select.trySelect(null))
485 select.resumeSelectCancellableWithException(closed.sendException)
486 }
487
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300488 override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300489 }
490
491 private class SendBuffered<out E>(
492 @JvmField val element: E
493 ) : LockFreeLinkedListNode(), Send {
494 override val pollResult: Any? get() = element
495 override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
496 override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +0300497 override fun resumeSendClosed(closed: Closed<*>) {}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300498 }
499}
500
501/**
502 * Abstract send/receive channel. It is a base class for all channel implementations.
503 */
504public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
505 // ------ extension points for buffered channels ------
506
507 /**
508 * Returns `true` if [isBufferEmpty] is always `true`.
509 * @suppress **This is unstable API and it is subject to change.**
510 */
511 protected abstract val isBufferAlwaysEmpty: Boolean
512
513 /**
514 * Returns `true` if this channel's buffer is empty.
515 * @suppress **This is unstable API and it is subject to change.**
516 */
517 protected abstract val isBufferEmpty: Boolean
518
519 // ------ internal functions for override by buffered channels ------
520
521 /**
522 * Tries to remove element from buffer or from queued sender.
523 * Return type is `E | POLL_FAILED | Closed`
524 * @suppress **This is unstable API and it is subject to change.**
525 */
526 protected open fun pollInternal(): Any? {
527 while (true) {
528 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
529 val token = send.tryResumeSend(idempotent = null)
530 if (token != null) {
531 send.completeResumeSend(token)
532 return send.pollResult
533 }
534 }
535 }
536
537 /**
538 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
539 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
540 * @suppress **This is unstable API and it is subject to change.**
541 */
542 protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
543 // poll atomically with select
544 val pollOp = describeTryPoll()
545 val failure = select.performAtomicTrySelect(pollOp)
546 if (failure != null) return failure
547 val send = pollOp.result
548 send.completeResumeSend(pollOp.resumeToken!!)
549 return pollOp.pollResult
550 }
551
552 // ------ state functions & helpers for concrete implementations ------
553
554 /**
555 * @suppress **This is unstable API and it is subject to change.**
556 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300557 protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300558
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300559 // ------ ReceiveChannel ------
560
Roman Elizarov0a788392017-02-15 17:52:12 +0300561 public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300562 public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300563
564 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300565 public final override suspend fun receive(): E {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300566 // fast path -- try poll non-blocking
567 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300568 if (result !== POLL_FAILED) return receiveResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300569 // slow-path does suspend
570 return receiveSuspend()
571 }
572
573 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300574 private fun receiveResult(result: Any?): E {
575 if (result is Closed<*>) throw result.receiveException
576 return result as E
577 }
578
579 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300580 private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300581 val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300582 while (true) {
583 if (enqueueReceive(receive)) {
584 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300585 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300586 return@sc
587 }
588 // hm... something is not right. try to poll
589 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300590 if (result is Closed<*>) {
591 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300592 return@sc
593 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300594 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300595 cont.resume(result as E)
596 return@sc
597 }
598 }
599 }
600
Roman Elizarov0a788392017-02-15 17:52:12 +0300601 private fun enqueueReceive(receive: Receive<E>): Boolean {
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300602 val result = if (isBufferAlwaysEmpty)
603 queue.addLastIfPrev(receive, { it !is Send }) else
604 queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
Roman Elizarov3e342e32018-01-13 20:05:51 +0300605 if (result) onReceiveEnqueued()
Roman Elizarov0a788392017-02-15 17:52:12 +0300606 return result
607 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300608
609 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300610 public final override suspend fun receiveOrNull(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300611 // fast path -- try poll non-blocking
612 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300613 if (result !== POLL_FAILED) return receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300614 // slow-path does suspend
615 return receiveOrNullSuspend()
616 }
617
618 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300619 private fun receiveOrNullResult(result: Any?): E? {
620 if (result is Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300621 if (result.closeCause != null) throw result.closeCause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300622 return null
623 }
624 return result as E
625 }
626
627 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300628 private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300629 val receive = ReceiveElement(cont, nullOnClose = true)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300630 while (true) {
631 if (enqueueReceive(receive)) {
632 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300633 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300634 return@sc
635 }
636 // hm... something is not right. try to poll
637 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300638 if (result is Closed<*>) {
639 if (result.closeCause == null)
640 cont.resume(null)
641 else
Roman Elizarov1216e912017-02-22 09:57:06 +0300642 cont.resumeWithException(result.closeCause)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300643 return@sc
644 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300645 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300646 cont.resume(result as E)
647 return@sc
648 }
649 }
650 }
651
652 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300653 public final override fun poll(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300654 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300655 return if (result === POLL_FAILED) null else receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300656 }
657
Roman Elizarovb555d912017-08-17 21:01:33 +0300658 override fun cancel(cause: Throwable?): Boolean =
659 close(cause).also {
660 cleanupSendQueueOnCancel()
661 }
662
663 // Note: this function is invoked when channel is already closed
664 protected open fun cleanupSendQueueOnCancel() {
665 val closed = closedForSend ?: error("Cannot happen")
666 while (true) {
667 val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
668 if (send is Closed<*>) {
669 check(send === closed)
670 return // cleaned
671 }
672 send.resumeSendClosed(closed)
673 }
674 }
675
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300676 public final override fun iterator(): ChannelIterator<E> = Itr(this)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300677
Roman Elizarov1216e912017-02-22 09:57:06 +0300678 // ------ registerSelectReceive ------
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300679
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300680 /**
681 * @suppress **This is unstable API and it is subject to change.**
682 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300683 protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300684
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300685 /**
686 * @suppress **This is unstable API and it is subject to change.**
687 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300688 protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
689 @JvmField var resumeToken: Any? = null
690 @JvmField var pollResult: E? = null
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300691
Roman Elizarov1216e912017-02-22 09:57:06 +0300692 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
693 if (affected is Closed<*>) return affected
694 if (affected !is Send) return POLL_FAILED
695 return null
Roman Elizarov0a788392017-02-15 17:52:12 +0300696 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300697
698 @Suppress("UNCHECKED_CAST")
699 override fun validatePrepared(node: Send): Boolean {
Roman Elizarov932e8602017-06-21 17:21:37 +0300700 val token = node.tryResumeSend(idempotent = this) ?: return false
Roman Elizarov1216e912017-02-22 09:57:06 +0300701 resumeToken = token
702 pollResult = node.pollResult as E
703 return true
704 }
705 }
706
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300707 private inner class TryEnqueueReceiveDesc<E, R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300708 select: SelectInstance<R>,
709 block: suspend (E?) -> R,
710 nullOnClose: Boolean
Roman Elizarov174c6962017-02-28 17:36:51 +0300711 ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300712 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
713 if (affected is Send) return ENQUEUE_FAILED
714 return null
715 }
716
717 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
718 if (!isBufferEmpty) return ENQUEUE_FAILED
719 return super.onPrepare(affected, next)
720 }
721
722 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
723 super.finishOnSuccess(affected, next)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300724 // notify the there is one more receiver
Roman Elizarov3e342e32018-01-13 20:05:51 +0300725 onReceiveEnqueued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300726 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300727 node.removeOnSelectCompletion()
Roman Elizarov1216e912017-02-22 09:57:06 +0300728 }
729 }
730
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300731 final override val onReceive: SelectClause1<E>
732 get() = object : SelectClause1<E> {
733 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
734 registerSelectReceive(select, block)
735 }
736 }
737
Roman Elizarov1216e912017-02-22 09:57:06 +0300738 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300739 private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300740 while (true) {
741 if (select.isSelected) return
742 if (isEmpty) {
743 val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
744 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
745 when {
746 enqueueResult === ALREADY_SELECTED -> return
747 enqueueResult === ENQUEUE_FAILED -> {} // retry
748 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
749 }
750 } else {
751 val pollResult = pollSelectInternal(select)
752 when {
753 pollResult === ALREADY_SELECTED -> return
754 pollResult === POLL_FAILED -> {} // retry
755 pollResult is Closed<*> -> throw pollResult.receiveException
756 else -> {
Roman Elizarov4638d792017-03-14 19:39:26 +0300757 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300758 return
759 }
760 }
761 }
762 }
763 }
764
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300765 final override val onReceiveOrNull: SelectClause1<E?>
766 get() = object : SelectClause1<E?> {
767 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
768 registerSelectReceiveOrNull(select, block)
769 }
770 }
771
Roman Elizarov1216e912017-02-22 09:57:06 +0300772 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300773 private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300774 while (true) {
775 if (select.isSelected) return
776 if (isEmpty) {
777 val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
778 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
779 when {
780 enqueueResult === ALREADY_SELECTED -> return
781 enqueueResult === ENQUEUE_FAILED -> {} // retry
782 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
783 }
784 } else {
785 val pollResult = pollSelectInternal(select)
786 when {
787 pollResult === ALREADY_SELECTED -> return
788 pollResult === POLL_FAILED -> {} // retry
789 pollResult is Closed<*> -> {
790 if (pollResult.closeCause == null) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300791 if (select.trySelect(null))
Roman Elizarov4638d792017-03-14 19:39:26 +0300792 block.startCoroutineUndispatched(null, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300793 return
794 } else
795 throw pollResult.closeCause
796 }
797 else -> {
798 // selected successfully
Roman Elizarov4638d792017-03-14 19:39:26 +0300799 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300800 return
801 }
802 }
803 }
804 }
805 }
806
807 // ------ protected ------
808
Roman Elizarov3e342e32018-01-13 20:05:51 +0300809 override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
810 super.takeFirstReceiveOrPeekClosed().also {
811 if (it != null && it !is Closed<*>) onReceiveDequeued()
812 }
Roman Elizarov0a788392017-02-15 17:52:12 +0300813
814 /**
Roman Elizarov3e342e32018-01-13 20:05:51 +0300815 * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
816 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov0a788392017-02-15 17:52:12 +0300817 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300818 protected open fun onReceiveEnqueued() {}
819
820 /**
821 * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
822 * @suppress **This is unstable API and it is subject to change.**
823 */
824 protected open fun onReceiveDequeued() {}
Roman Elizarov0a788392017-02-15 17:52:12 +0300825
Roman Elizarov1216e912017-02-22 09:57:06 +0300826 // ------ private ------
827
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300828 private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
829 cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
830
831 private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
832 override fun invoke(cause: Throwable?) {
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300833 if (receive.remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300834 onReceiveDequeued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300835 }
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300836 override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
Roman Elizarov1216e912017-02-22 09:57:06 +0300837 }
838
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300839 private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
Roman Elizarov1216e912017-02-22 09:57:06 +0300840 var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300841
Roman Elizarov3e342e32018-01-13 20:05:51 +0300842 override suspend fun hasNext(): Boolean {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300843 // check for repeated hasNext
Roman Elizarov1216e912017-02-22 09:57:06 +0300844 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300845 // fast path -- try poll non-blocking
846 result = channel.pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300847 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300848 // slow-path does suspend
849 return hasNextSuspend()
850 }
851
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300852 private fun hasNextResult(result: Any?): Boolean {
853 if (result is Closed<*>) {
854 if (result.closeCause != null) throw result.receiveException
855 return false
856 }
857 return true
858 }
859
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300860 private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300861 val receive = ReceiveHasNext(this, cont)
862 while (true) {
863 if (channel.enqueueReceive(receive)) {
864 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300865 channel.removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300866 return@sc
867 }
868 // hm... something is not right. try to poll
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300869 val result = channel.pollInternal()
870 this.result = result
871 if (result is Closed<*>) {
872 if (result.closeCause == null)
873 cont.resume(false)
874 else
875 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300876 return@sc
877 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300878 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300879 cont.resume(true)
880 return@sc
881 }
882 }
883 }
884
885 @Suppress("UNCHECKED_CAST")
Roman Elizarov3e342e32018-01-13 20:05:51 +0300886 override suspend fun next(): E {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300887 val result = this.result
888 if (result is Closed<*>) throw result.receiveException
Roman Elizarov1216e912017-02-22 09:57:06 +0300889 if (result !== POLL_FAILED) {
890 this.result = POLL_FAILED
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300891 return result as E
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300892 }
893 // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
894 return channel.receive()
895 }
896 }
897
Roman Elizarov1216e912017-02-22 09:57:06 +0300898 private class ReceiveElement<in E>(
899 @JvmField val cont: CancellableContinuation<E?>,
900 @JvmField val nullOnClose: Boolean
901 ) : Receive<E>() {
902 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300903 override fun completeResumeReceive(token: Any) = cont.completeResume(token)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300904 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300905 if (closed.closeCause == null && nullOnClose)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300906 cont.resume(null)
907 else
908 cont.resumeWithException(closed.receiveException)
909 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300910 override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300911 }
912
913 private class ReceiveHasNext<E>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300914 @JvmField val iterator: Itr<E>,
Roman Elizarov1216e912017-02-22 09:57:06 +0300915 @JvmField val cont: CancellableContinuation<Boolean>
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300916 ) : Receive<E>() {
Roman Elizarov1216e912017-02-22 09:57:06 +0300917 override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
918 val token = cont.tryResume(true, idempotent)
919 if (token != null) {
920 /*
921 When idempotent != null this invocation can be stale and we cannot directly update iterator.result
922 Instead, we save both token & result into a temporary IdempotentTokenValue object and
923 set iterator result only in completeResumeReceive that is going to be invoked just once
924 */
925 if (idempotent != null) return IdempotentTokenValue(token, value)
926 iterator.result = value
927 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300928 return token
929 }
930
Roman Elizarov1216e912017-02-22 09:57:06 +0300931 override fun completeResumeReceive(token: Any) {
932 if (token is IdempotentTokenValue<*>) {
933 iterator.result = token.value
934 cont.completeResume(token.token)
935 } else
936 cont.completeResume(token)
937 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300938
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300939 override fun resumeReceiveClosed(closed: Closed<*>) {
940 val token = if (closed.closeCause == null)
941 cont.tryResume(false)
942 else
943 cont.tryResumeWithException(closed.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300944 if (token != null) {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300945 iterator.result = closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300946 cont.completeResume(token)
947 }
948 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300949 override fun toString(): String = "ReceiveHasNext[$cont]"
950 }
951
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300952 private inner class ReceiveSelect<R, in E>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300953 @JvmField val select: SelectInstance<R>,
954 @JvmField val block: suspend (E?) -> R,
955 @JvmField val nullOnClose: Boolean
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300956 ) : Receive<E>(), DisposableHandle {
Roman Elizarov1216e912017-02-22 09:57:06 +0300957 override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
958 if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
959
960 @Suppress("UNCHECKED_CAST")
961 override fun completeResumeReceive(token: Any) {
962 val value: E = (if (token === NULL_VALUE) null else token) as E
963 block.startCoroutine(value, select.completion)
964 }
965
966 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300967 if (select.trySelect(null)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300968 if (closed.closeCause == null && nullOnClose) {
969 block.startCoroutine(null, select.completion)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300970 } else {
Roman Elizarov932e8602017-06-21 17:21:37 +0300971 // even though we are dispatching coroutine to process channel close on receive,
972 // which is an atomically cancellable suspending function,
973 // close is a final state, so we can use a cancellable resume mode
974 select.resumeSelectCancellableWithException(closed.receiveException)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300975 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300976 }
977 }
978
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300979 fun removeOnSelectCompletion() {
980 select.disposeOnSelect(this)
981 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300982
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300983 override fun dispose() { // invoked on select completion
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300984 if (remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300985 onReceiveDequeued() // notify cancellation of receive
Roman Elizarov1216e912017-02-22 09:57:06 +0300986 }
987
988 override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300989 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300990
991 private class IdempotentTokenValue<out E>(
992 @JvmField val token: Any,
993 @JvmField val value: E
994 )
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300995}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300996
997/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300998@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300999
1000/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001001@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001002
1003/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001004@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001005
1006/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001007@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001008
1009/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001010@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001011
1012/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001013@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001014
1015/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001016@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001017
1018/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +03001019@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001020
Vsevolod Tolstopyatov732474f2018-07-20 11:36:20 +03001021internal typealias Handler = (Throwable?) -> Unit
1022@JvmField internal val HANDLER_INVOKED = Any()
1023
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001024/**
1025 * Represents sending waiter in the queue.
1026 * @suppress **This is unstable API and it is subject to change.**
1027 */
1028public interface Send {
1029 val pollResult: Any? // E | Closed
1030 fun tryResumeSend(idempotent: Any?): Any?
1031 fun completeResumeSend(token: Any)
Roman Elizarovb555d912017-08-17 21:01:33 +03001032 fun resumeSendClosed(closed: Closed<*>)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001033}
1034
1035/**
1036 * Represents receiver waiter in the queue or closed token.
1037 * @suppress **This is unstable API and it is subject to change.**
1038 */
1039public interface ReceiveOrClosed<in E> {
1040 val offerResult: Any // OFFER_SUCCESS | Closed
1041 fun tryResumeReceive(value: E, idempotent: Any?): Any?
1042 fun completeResumeReceive(token: Any)
1043}
1044
1045/**
Roman Elizarovb555d912017-08-17 21:01:33 +03001046 * Represents sender for a specific element.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001047 * @suppress **This is unstable API and it is subject to change.**
1048 */
1049@Suppress("UNCHECKED_CAST")
1050public class SendElement(
1051 override val pollResult: Any?,
1052 @JvmField val cont: CancellableContinuation<Unit>
1053) : LockFreeLinkedListNode(), Send {
1054 override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
1055 override fun completeResumeSend(token: Any) = cont.completeResume(token)
Roman Elizarovb555d912017-08-17 21:01:33 +03001056 override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001057 override fun toString(): String = "SendElement($pollResult)[$cont]"
1058}
1059
1060/**
1061 * Represents closed channel.
1062 * @suppress **This is unstable API and it is subject to change.**
1063 */
1064public class Closed<in E>(
1065 @JvmField val closeCause: Throwable?
1066) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
1067 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1068 val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1069
1070 override val offerResult get() = this
1071 override val pollResult get() = this
1072 override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
1073 override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarove6e8ce82017-06-05 17:04:39 +03001074 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
1075 override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +03001076 override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001077 override fun toString(): String = "Closed[$closeCause]"
1078}
1079
1080private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1081 override val offerResult get() = OFFER_SUCCESS
1082 abstract fun resumeReceiveClosed(closed: Closed<*>)
1083}