blob: 40a803493c4c958fb83af82024ef3d5384733497 [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarovf16fd272017-02-07 11:26:00 +03003 */
4
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03005package kotlinx.coroutines.experimental.channels
6
Roman Elizarove4b6f092018-03-06 13:37:42 +03007import kotlinx.coroutines.experimental.*
Roman Elizarov1216e912017-02-22 09:57:06 +03008import kotlinx.coroutines.experimental.internal.*
Vsevolod Tolstopyatov96191342018-04-20 18:13:33 +03009import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarove4b6f092018-03-06 13:37:42 +030010import kotlinx.coroutines.experimental.intrinsics.*
11import kotlinx.coroutines.experimental.selects.*
12import kotlin.coroutines.experimental.*
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030013
14/**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030015 * Abstract send channel. It is a base class for all send channel implementations.
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030016 */
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030017public abstract class AbstractSendChannel<E> : SendChannel<E> {
18 /** @suppress **This is unstable API and it is subject to change.** */
19 protected val queue = LockFreeLinkedListHead()
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030020
21 // ------ extension points for buffered channels ------
22
Roman Elizarovf138bbc2017-02-09 19:13:08 +030023 /**
Roman Elizarov2ad0e942017-02-28 19:14:08 +030024 * Returns `true` if [isBufferFull] is always `true`.
25 * @suppress **This is unstable API and it is subject to change.**
26 */
27 protected abstract val isBufferAlwaysFull: Boolean
28
29 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030030 * Returns `true` if this channel's buffer is full.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030031 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030032 */
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030033 protected abstract val isBufferFull: Boolean
34
Roman Elizarov2ad0e942017-02-28 19:14:08 +030035 // ------ internal functions for override by buffered channels ------
36
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030037 /**
38 * Tries to add element to buffer or to queued receiver.
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030039 * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030040 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030041 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030042 protected open fun offerInternal(element: E): Any {
43 while (true) {
44 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
45 val token = receive.tryResumeReceive(element, idempotent = null)
46 if (token != null) {
47 receive.completeResumeReceive(token)
48 return receive.offerResult
49 }
50 }
51 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030052
53 /**
Roman Elizarov1216e912017-02-22 09:57:06 +030054 * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
55 * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030056 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov1216e912017-02-22 09:57:06 +030057 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +030058 protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
59 // offer atomically with select
60 val offerOp = describeTryOffer(element)
61 val failure = select.performAtomicTrySelect(offerOp)
62 if (failure != null) return failure
63 val receive = offerOp.result
64 receive.completeResumeReceive(offerOp.resumeToken!!)
65 return receive.offerResult
66 }
Roman Elizarov1216e912017-02-22 09:57:06 +030067
Roman Elizarov2ad0e942017-02-28 19:14:08 +030068 // ------ state functions & helpers for concrete implementations ------
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030069
Roman Elizarovf138bbc2017-02-09 19:13:08 +030070 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030071 * Returns non-null closed token if it is last in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030072 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +030073 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +030074 protected val closedForSend: Closed<*>? get() = queue.prevNode as? Closed<*>
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030075
Roman Elizarov2ad0e942017-02-28 19:14:08 +030076 /**
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030077 * Returns non-null closed token if it is first in the queue.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030078 * @suppress **This is unstable API and it is subject to change.**
79 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +030080 protected val closedForReceive: Closed<*>? get() = queue.nextNode as? Closed<*>
Roman Elizarove3aa8ff2017-04-27 19:16:40 +030081
82 /**
83 * Retrieves first sending waiter from the queue or returns closed token.
84 * @suppress **This is unstable API and it is subject to change.**
85 */
86 protected fun takeFirstSendOrPeekClosed(): Send? =
87 queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
Roman Elizarov2ad0e942017-02-28 19:14:08 +030088
89 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +030090 * Queues buffered element, returns null on success or
91 * returns node reference if it was already closed or is waiting for receive.
Roman Elizarov2ad0e942017-02-28 19:14:08 +030092 * @suppress **This is unstable API and it is subject to change.**
93 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +030094 protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
95 queue.addLastIfPrev(SendBuffered(element), { prev ->
96 if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
97 true
98 })
99 return null
100 }
101
102 /**
103 * Queues conflated element, returns null on success or
104 * returns node reference if it was already closed or is waiting for receive.
105 * @suppress **This is unstable API and it is subject to change.**
106 */
107 protected fun sendConflated(element: E): ReceiveOrClosed<*>? {
108 val node = SendBuffered(element)
109 queue.addLastIfPrev(node, { prev ->
110 if (prev is ReceiveOrClosed<*>) return@sendConflated prev
111 true
112 })
113 conflatePreviousSendBuffered(node)
114 return null
115 }
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300116
117 /**
118 * @suppress **This is unstable API and it is subject to change.**
119 */
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300120 protected fun conflatePreviousSendBuffered(node: LockFreeLinkedListNode) {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300121 val prev = node.prevNode
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300122 (prev as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300123 }
124
125 /**
126 * @suppress **This is unstable API and it is subject to change.**
127 */
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300128 protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
129
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300130 private open class SendBufferedDesc<E>(
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300131 queue: LockFreeLinkedListHead,
132 element: E
133 ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
134 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
135 if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
136 return null
137 }
138 }
139
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300140 /**
141 * @suppress **This is unstable API and it is subject to change.**
142 */
143 protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
144
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300145 private class SendConflatedDesc<E>(
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300146 queue: LockFreeLinkedListHead,
147 element: E
148 ) : SendBufferedDesc<E>(queue, element) {
149 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
150 super.finishOnSuccess(affected, next)
151 // remove previous SendBuffered
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300152 (affected as? SendBuffered<*>)?.remove()
Roman Elizarov3aed4ee2017-03-06 12:21:05 +0300153 }
154 }
155
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300156 // ------ SendChannel ------
157
Roman Elizarov0a788392017-02-15 17:52:12 +0300158 public final override val isClosedForSend: Boolean get() = closedForSend != null
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300159 public final override val isFull: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300160
Roman Elizarov0a788392017-02-15 17:52:12 +0300161 public final override suspend fun send(element: E) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300162 // fast path -- try offer non-blocking
163 if (offer(element)) return
164 // slow-path does suspend
165 return sendSuspend(element)
166 }
167
Roman Elizarov0a788392017-02-15 17:52:12 +0300168 public final override fun offer(element: E): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300169 val result = offerInternal(element)
170 return when {
171 result === OFFER_SUCCESS -> true
Roman Elizarov1216e912017-02-22 09:57:06 +0300172 result === OFFER_FAILED -> false
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300173 result is Closed<*> -> throw result.sendException
Roman Elizarov1216e912017-02-22 09:57:06 +0300174 else -> error("offerInternal returned $result")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300175 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300176 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300177
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300178 private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300179 val send = SendElement(element, cont)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300180 loop@ while (true) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300181 val enqueueResult = enqueueSend(send)
182 when (enqueueResult) {
183 null -> { // enqueued successfully
184 cont.initCancellability() // make it properly cancellable
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300185 cont.removeOnCancellation(send)
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300186 return@sc
187 }
188 is Closed<*> -> {
189 cont.resumeWithException(enqueueResult.sendException)
190 return@sc
191 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300192 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300193 // hm... receiver is waiting or buffer is not full. try to offer
194 val offerResult = offerInternal(element)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300195 when {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300196 offerResult === OFFER_SUCCESS -> {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300197 cont.resume(Unit)
198 return@sc
199 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300200 offerResult === OFFER_FAILED -> continue@loop
201 offerResult is Closed<*> -> {
202 cont.resumeWithException(offerResult.sendException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300203 return@sc
204 }
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300205 else -> error("offerInternal returned $offerResult")
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300206 }
207 }
208 }
209
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300210 /**
211 * Result is:
212 * * null -- successfully enqueued
213 * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
214 * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
215 */
216 private fun enqueueSend(send: SendElement): Any? {
217 if (isBufferAlwaysFull) {
218 queue.addLastIfPrev(send, { prev ->
219 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
220 true
221 })
222 } else {
223 if (!queue.addLastIfPrevAndIf(send, { prev ->
224 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
225 true
226 }, { isBufferFull }))
227 return ENQUEUE_FAILED
228 }
229 return null
230 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300231
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300232 public override fun close(cause: Throwable?): Boolean {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300233 val closed = Closed<E>(cause)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300234 while (true) {
235 val receive = takeFirstReceiveOrPeekClosed()
236 if (receive == null) {
237 // queue empty or has only senders -- try add last "Closed" item to the queue
Roman Elizarov98b7a6e2017-06-07 08:43:43 +0300238 if (queue.addLastIfPrev(closed, { prev ->
239 if (prev is Closed<*>) return false // already closed
240 prev !is ReceiveOrClosed<*> // only add close if no waiting receive
241 })) {
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300242 onClosed(closed)
Roman Elizarov0a788392017-02-15 17:52:12 +0300243 afterClose(cause)
244 return true
245 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300246 continue // retry on failure
247 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300248 if (receive is Closed<*>) return false // already marked as closed -- nothing to do
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300249 receive as Receive<E> // type assertion
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300250 receive.resumeReceiveClosed(closed)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300251 }
252 }
253
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300254 /**
Roman Elizarove6e8ce82017-06-05 17:04:39 +0300255 * Invoked when [Closed] element was just added.
256 * @suppress **This is unstable API and it is subject to change.**
257 */
258 protected open fun onClosed(closed: Closed<E>) {}
259
260 /**
Roman Elizarov0a788392017-02-15 17:52:12 +0300261 * Invoked after successful [close].
262 */
263 protected open fun afterClose(cause: Throwable?) {}
264
265 /**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300266 * Retrieves first receiving waiter from the queue or returns closed token.
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300267 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarovf138bbc2017-02-09 19:13:08 +0300268 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300269 protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
Roman Elizarov1216e912017-02-22 09:57:06 +0300270 queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
271
272 // ------ registerSelectSend ------
273
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300274 /**
275 * @suppress **This is unstable API and it is subject to change.**
276 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300277 protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
278
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300279 /**
280 * @suppress **This is unstable API and it is subject to change.**
281 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300282 protected class TryOfferDesc<E>(
283 @JvmField val element: E,
284 queue: LockFreeLinkedListHead
285 ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
286 @JvmField var resumeToken: Any? = null
287
288 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
289 if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
290 if (affected is Closed<*>) return affected
291 return null
292 }
293
294 override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
295 val token = node.tryResumeReceive(element, idempotent = this) ?: return false
296 resumeToken = token
297 return true
298 }
299 }
300
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300301 private inner class TryEnqueueSendDesc<R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300302 element: E,
303 select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300304 block: suspend (SendChannel<E>) -> R
305 ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300306 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
307 if (affected is ReceiveOrClosed<*>) {
308 return affected as? Closed<*> ?: ENQUEUE_FAILED
309 }
310 return null
311 }
312
313 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
314 if (!isBufferFull) return ENQUEUE_FAILED
315 return super.onPrepare(affected, next)
316 }
317
318 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
319 super.finishOnSuccess(affected, next)
320 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300321 node.disposeOnSelect()
Roman Elizarov1216e912017-02-22 09:57:06 +0300322 }
323 }
324
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300325 final override val onSend: SelectClause2<E, SendChannel<E>>
326 get() = object : SelectClause2<E, SendChannel<E>> {
327 override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
328 registerSelectSend(select, param, block)
329 }
330 }
331
332 private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300333 while (true) {
334 if (select.isSelected) return
335 if (isFull) {
336 val enqueueOp = TryEnqueueSendDesc(element, select, block)
337 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
338 when {
339 enqueueResult === ALREADY_SELECTED -> return
340 enqueueResult === ENQUEUE_FAILED -> {} // retry
341 enqueueResult is Closed<*> -> throw enqueueResult.sendException
342 else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
343 }
344 } else {
345 val offerResult = offerSelectInternal(element, select)
346 when {
347 offerResult === ALREADY_SELECTED -> return
348 offerResult === OFFER_FAILED -> {} // retry
349 offerResult === OFFER_SUCCESS -> {
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300350 block.startCoroutineUndispatched(receiver = this, completion = select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300351 return
352 }
353 offerResult is Closed<*> -> throw offerResult.sendException
354 else -> error("offerSelectInternal returned $offerResult")
355 }
356 }
357 }
358 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300359
Roman Elizarove4b6f092018-03-06 13:37:42 +0300360 // ------ debug ------
361
362 public override fun toString() =
363 "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
364
365 private val queueDebugStateString: String
366 get() {
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300367 val head = queue.nextNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300368 if (head === queue) return "EmptyQueue"
369 var result = when (head) {
370 is Closed<*> -> head.toString()
371 is Receive<*> -> "ReceiveQueued"
372 is Send -> "SendQueued"
373 else -> "UNEXPECTED:$head" // should not happen
374 }
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300375 val tail = queue.prevNode
Roman Elizarove4b6f092018-03-06 13:37:42 +0300376 if (tail !== head) {
377 result += ",queueSize=${countQueueSize()}"
378 if (tail is Closed<*>) result += ",closedForSend=$tail"
379 }
380 return result
381 }
382
383 private fun countQueueSize(): Int {
384 var size = 0
385 queue.forEach<LockFreeLinkedListNode> { size++ }
386 return size
387 }
388
389 protected open val bufferDebugString: String get() = ""
390
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300391 // ------ private ------
392
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300393 private class SendSelect<E, R>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300394 override val pollResult: Any?,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300395 @JvmField val channel: SendChannel<E>,
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300396 @JvmField val select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300397 @JvmField val block: suspend (SendChannel<E>) -> R
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300398 ) : LockFreeLinkedListNode(), Send, DisposableHandle {
399 override fun tryResumeSend(idempotent: Any?): Any? =
400 if (select.trySelect(idempotent)) SELECT_STARTED else null
401
402 override fun completeResumeSend(token: Any) {
403 check(token === SELECT_STARTED)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300404 block.startCoroutine(receiver = channel, completion = select.completion)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300405 }
406
407 fun disposeOnSelect() {
408 select.disposeOnSelect(this)
409 }
410
411 override fun dispose() {
412 remove()
413 }
414
Roman Elizarovb555d912017-08-17 21:01:33 +0300415 override fun resumeSendClosed(closed: Closed<*>) {
416 if (select.trySelect(null))
417 select.resumeSelectCancellableWithException(closed.sendException)
418 }
419
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300420 override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300421 }
422
423 private class SendBuffered<out E>(
424 @JvmField val element: E
425 ) : LockFreeLinkedListNode(), Send {
426 override val pollResult: Any? get() = element
427 override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
428 override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +0300429 override fun resumeSendClosed(closed: Closed<*>) {}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300430 }
431}
432
433/**
434 * Abstract send/receive channel. It is a base class for all channel implementations.
435 */
436public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
437 // ------ extension points for buffered channels ------
438
439 /**
440 * Returns `true` if [isBufferEmpty] is always `true`.
441 * @suppress **This is unstable API and it is subject to change.**
442 */
443 protected abstract val isBufferAlwaysEmpty: Boolean
444
445 /**
446 * Returns `true` if this channel's buffer is empty.
447 * @suppress **This is unstable API and it is subject to change.**
448 */
449 protected abstract val isBufferEmpty: Boolean
450
451 // ------ internal functions for override by buffered channels ------
452
453 /**
454 * Tries to remove element from buffer or from queued sender.
455 * Return type is `E | POLL_FAILED | Closed`
456 * @suppress **This is unstable API and it is subject to change.**
457 */
458 protected open fun pollInternal(): Any? {
459 while (true) {
460 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
461 val token = send.tryResumeSend(idempotent = null)
462 if (token != null) {
463 send.completeResumeSend(token)
464 return send.pollResult
465 }
466 }
467 }
468
469 /**
470 * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
471 * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
472 * @suppress **This is unstable API and it is subject to change.**
473 */
474 protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
475 // poll atomically with select
476 val pollOp = describeTryPoll()
477 val failure = select.performAtomicTrySelect(pollOp)
478 if (failure != null) return failure
479 val send = pollOp.result
480 send.completeResumeSend(pollOp.resumeToken!!)
481 return pollOp.pollResult
482 }
483
484 // ------ state functions & helpers for concrete implementations ------
485
486 /**
487 * @suppress **This is unstable API and it is subject to change.**
488 */
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300489 protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300490
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300491 // ------ ReceiveChannel ------
492
Roman Elizarov0a788392017-02-15 17:52:12 +0300493 public final override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
Roman Elizarov0406a9b2018-04-26 12:35:43 +0300494 public final override val isEmpty: Boolean get() = queue.nextNode !is Send && isBufferEmpty
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300495
496 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300497 public final override suspend fun receive(): E {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300498 // fast path -- try poll non-blocking
499 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300500 if (result !== POLL_FAILED) return receiveResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300501 // slow-path does suspend
502 return receiveSuspend()
503 }
504
505 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300506 private fun receiveResult(result: Any?): E {
507 if (result is Closed<*>) throw result.receiveException
508 return result as E
509 }
510
511 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300512 private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300513 val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300514 while (true) {
515 if (enqueueReceive(receive)) {
516 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300517 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300518 return@sc
519 }
520 // hm... something is not right. try to poll
521 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300522 if (result is Closed<*>) {
523 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300524 return@sc
525 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300526 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300527 cont.resume(result as E)
528 return@sc
529 }
530 }
531 }
532
Roman Elizarov0a788392017-02-15 17:52:12 +0300533 private fun enqueueReceive(receive: Receive<E>): Boolean {
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300534 val result = if (isBufferAlwaysEmpty)
535 queue.addLastIfPrev(receive, { it !is Send }) else
536 queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
Roman Elizarov3e342e32018-01-13 20:05:51 +0300537 if (result) onReceiveEnqueued()
Roman Elizarov0a788392017-02-15 17:52:12 +0300538 return result
539 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300540
541 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300542 public final override suspend fun receiveOrNull(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300543 // fast path -- try poll non-blocking
544 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300545 if (result !== POLL_FAILED) return receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300546 // slow-path does suspend
547 return receiveOrNullSuspend()
548 }
549
550 @Suppress("UNCHECKED_CAST")
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300551 private fun receiveOrNullResult(result: Any?): E? {
552 if (result is Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300553 if (result.closeCause != null) throw result.closeCause
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300554 return null
555 }
556 return result as E
557 }
558
559 @Suppress("UNCHECKED_CAST")
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300560 private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov1216e912017-02-22 09:57:06 +0300561 val receive = ReceiveElement(cont, nullOnClose = true)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300562 while (true) {
563 if (enqueueReceive(receive)) {
564 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300565 removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300566 return@sc
567 }
568 // hm... something is not right. try to poll
569 val result = pollInternal()
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300570 if (result is Closed<*>) {
571 if (result.closeCause == null)
572 cont.resume(null)
573 else
Roman Elizarov1216e912017-02-22 09:57:06 +0300574 cont.resumeWithException(result.closeCause)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300575 return@sc
576 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300577 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300578 cont.resume(result as E)
579 return@sc
580 }
581 }
582 }
583
584 @Suppress("UNCHECKED_CAST")
Roman Elizarov0a788392017-02-15 17:52:12 +0300585 public final override fun poll(): E? {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300586 val result = pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300587 return if (result === POLL_FAILED) null else receiveOrNullResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300588 }
589
Roman Elizarovb555d912017-08-17 21:01:33 +0300590 override fun cancel(cause: Throwable?): Boolean =
591 close(cause).also {
592 cleanupSendQueueOnCancel()
593 }
594
595 // Note: this function is invoked when channel is already closed
596 protected open fun cleanupSendQueueOnCancel() {
597 val closed = closedForSend ?: error("Cannot happen")
598 while (true) {
599 val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
600 if (send is Closed<*>) {
601 check(send === closed)
602 return // cleaned
603 }
604 send.resumeSendClosed(closed)
605 }
606 }
607
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300608 public final override fun iterator(): ChannelIterator<E> = Itr(this)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300609
Roman Elizarov1216e912017-02-22 09:57:06 +0300610 // ------ registerSelectReceive ------
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300611
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300612 /**
613 * @suppress **This is unstable API and it is subject to change.**
614 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300615 protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300616
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300617 /**
618 * @suppress **This is unstable API and it is subject to change.**
619 */
Roman Elizarov1216e912017-02-22 09:57:06 +0300620 protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
621 @JvmField var resumeToken: Any? = null
622 @JvmField var pollResult: E? = null
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300623
Roman Elizarov1216e912017-02-22 09:57:06 +0300624 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
625 if (affected is Closed<*>) return affected
626 if (affected !is Send) return POLL_FAILED
627 return null
Roman Elizarov0a788392017-02-15 17:52:12 +0300628 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300629
630 @Suppress("UNCHECKED_CAST")
631 override fun validatePrepared(node: Send): Boolean {
Roman Elizarov932e8602017-06-21 17:21:37 +0300632 val token = node.tryResumeSend(idempotent = this) ?: return false
Roman Elizarov1216e912017-02-22 09:57:06 +0300633 resumeToken = token
634 pollResult = node.pollResult as E
635 return true
636 }
637 }
638
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300639 private inner class TryEnqueueReceiveDesc<E, R>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300640 select: SelectInstance<R>,
641 block: suspend (E?) -> R,
642 nullOnClose: Boolean
Roman Elizarov174c6962017-02-28 17:36:51 +0300643 ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300644 override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
645 if (affected is Send) return ENQUEUE_FAILED
646 return null
647 }
648
649 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
650 if (!isBufferEmpty) return ENQUEUE_FAILED
651 return super.onPrepare(affected, next)
652 }
653
654 override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
655 super.finishOnSuccess(affected, next)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300656 // notify the there is one more receiver
Roman Elizarov3e342e32018-01-13 20:05:51 +0300657 onReceiveEnqueued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300658 // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300659 node.removeOnSelectCompletion()
Roman Elizarov1216e912017-02-22 09:57:06 +0300660 }
661 }
662
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300663 final override val onReceive: SelectClause1<E>
664 get() = object : SelectClause1<E> {
665 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
666 registerSelectReceive(select, block)
667 }
668 }
669
Roman Elizarov1216e912017-02-22 09:57:06 +0300670 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300671 private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300672 while (true) {
673 if (select.isSelected) return
674 if (isEmpty) {
675 val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
676 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
677 when {
678 enqueueResult === ALREADY_SELECTED -> return
679 enqueueResult === ENQUEUE_FAILED -> {} // retry
680 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
681 }
682 } else {
683 val pollResult = pollSelectInternal(select)
684 when {
685 pollResult === ALREADY_SELECTED -> return
686 pollResult === POLL_FAILED -> {} // retry
687 pollResult is Closed<*> -> throw pollResult.receiveException
688 else -> {
Roman Elizarov4638d792017-03-14 19:39:26 +0300689 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300690 return
691 }
692 }
693 }
694 }
695 }
696
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300697 final override val onReceiveOrNull: SelectClause1<E?>
698 get() = object : SelectClause1<E?> {
699 override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
700 registerSelectReceiveOrNull(select, block)
701 }
702 }
703
Roman Elizarov1216e912017-02-22 09:57:06 +0300704 @Suppress("UNCHECKED_CAST")
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300705 private fun <R> registerSelectReceiveOrNull(select: SelectInstance<R>, block: suspend (E?) -> R) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300706 while (true) {
707 if (select.isSelected) return
708 if (isEmpty) {
709 val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
710 val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
711 when {
712 enqueueResult === ALREADY_SELECTED -> return
713 enqueueResult === ENQUEUE_FAILED -> {} // retry
714 else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
715 }
716 } else {
717 val pollResult = pollSelectInternal(select)
718 when {
719 pollResult === ALREADY_SELECTED -> return
720 pollResult === POLL_FAILED -> {} // retry
721 pollResult is Closed<*> -> {
722 if (pollResult.closeCause == null) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300723 if (select.trySelect(null))
Roman Elizarov4638d792017-03-14 19:39:26 +0300724 block.startCoroutineUndispatched(null, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300725 return
726 } else
727 throw pollResult.closeCause
728 }
729 else -> {
730 // selected successfully
Roman Elizarov4638d792017-03-14 19:39:26 +0300731 block.startCoroutineUndispatched(pollResult as E, select.completion)
Roman Elizarov1216e912017-02-22 09:57:06 +0300732 return
733 }
734 }
735 }
736 }
737 }
738
739 // ------ protected ------
740
Roman Elizarov3e342e32018-01-13 20:05:51 +0300741 override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
742 super.takeFirstReceiveOrPeekClosed().also {
743 if (it != null && it !is Closed<*>) onReceiveDequeued()
744 }
Roman Elizarov0a788392017-02-15 17:52:12 +0300745
746 /**
Roman Elizarov3e342e32018-01-13 20:05:51 +0300747 * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
748 * @suppress **This is unstable API and it is subject to change.**
Roman Elizarov0a788392017-02-15 17:52:12 +0300749 */
Roman Elizarov3e342e32018-01-13 20:05:51 +0300750 protected open fun onReceiveEnqueued() {}
751
752 /**
753 * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
754 * @suppress **This is unstable API and it is subject to change.**
755 */
756 protected open fun onReceiveDequeued() {}
Roman Elizarov0a788392017-02-15 17:52:12 +0300757
Roman Elizarov1216e912017-02-22 09:57:06 +0300758 // ------ private ------
759
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300760 private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
761 cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
762
763 private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : CancelHandler() {
764 override fun invoke(cause: Throwable?) {
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300765 if (receive.remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300766 onReceiveDequeued()
Roman Elizarov1216e912017-02-22 09:57:06 +0300767 }
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300768 override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
Roman Elizarov1216e912017-02-22 09:57:06 +0300769 }
770
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300771 private class Itr<E>(val channel: AbstractChannel<E>) : ChannelIterator<E> {
Roman Elizarov1216e912017-02-22 09:57:06 +0300772 var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300773
Roman Elizarov3e342e32018-01-13 20:05:51 +0300774 override suspend fun hasNext(): Boolean {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300775 // check for repeated hasNext
Roman Elizarov1216e912017-02-22 09:57:06 +0300776 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300777 // fast path -- try poll non-blocking
778 result = channel.pollInternal()
Roman Elizarov1216e912017-02-22 09:57:06 +0300779 if (result !== POLL_FAILED) return hasNextResult(result)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300780 // slow-path does suspend
781 return hasNextSuspend()
782 }
783
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300784 private fun hasNextResult(result: Any?): Boolean {
785 if (result is Closed<*>) {
786 if (result.closeCause != null) throw result.receiveException
787 return false
788 }
789 return true
790 }
791
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300792 private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300793 val receive = ReceiveHasNext(this, cont)
794 while (true) {
795 if (channel.enqueueReceive(receive)) {
796 cont.initCancellability() // make it properly cancellable
Roman Elizarov0a788392017-02-15 17:52:12 +0300797 channel.removeReceiveOnCancel(cont, receive)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300798 return@sc
799 }
800 // hm... something is not right. try to poll
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300801 val result = channel.pollInternal()
802 this.result = result
803 if (result is Closed<*>) {
804 if (result.closeCause == null)
805 cont.resume(false)
806 else
807 cont.resumeWithException(result.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300808 return@sc
809 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300810 if (result !== POLL_FAILED) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300811 cont.resume(true)
812 return@sc
813 }
814 }
815 }
816
817 @Suppress("UNCHECKED_CAST")
Roman Elizarov3e342e32018-01-13 20:05:51 +0300818 override suspend fun next(): E {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300819 val result = this.result
820 if (result is Closed<*>) throw result.receiveException
Roman Elizarov1216e912017-02-22 09:57:06 +0300821 if (result !== POLL_FAILED) {
822 this.result = POLL_FAILED
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300823 return result as E
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300824 }
825 // rare case when hasNext was not invoked yet -- just delegate to receive (leave state as is)
826 return channel.receive()
827 }
828 }
829
Roman Elizarov1216e912017-02-22 09:57:06 +0300830 private class ReceiveElement<in E>(
831 @JvmField val cont: CancellableContinuation<E?>,
832 @JvmField val nullOnClose: Boolean
833 ) : Receive<E>() {
834 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300835 override fun completeResumeReceive(token: Any) = cont.completeResume(token)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300836 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300837 if (closed.closeCause == null && nullOnClose)
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300838 cont.resume(null)
839 else
840 cont.resumeWithException(closed.receiveException)
841 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300842 override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300843 }
844
845 private class ReceiveHasNext<E>(
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300846 @JvmField val iterator: Itr<E>,
Roman Elizarov1216e912017-02-22 09:57:06 +0300847 @JvmField val cont: CancellableContinuation<Boolean>
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300848 ) : Receive<E>() {
Roman Elizarov1216e912017-02-22 09:57:06 +0300849 override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
850 val token = cont.tryResume(true, idempotent)
851 if (token != null) {
852 /*
853 When idempotent != null this invocation can be stale and we cannot directly update iterator.result
854 Instead, we save both token & result into a temporary IdempotentTokenValue object and
855 set iterator result only in completeResumeReceive that is going to be invoked just once
856 */
857 if (idempotent != null) return IdempotentTokenValue(token, value)
858 iterator.result = value
859 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300860 return token
861 }
862
Roman Elizarov1216e912017-02-22 09:57:06 +0300863 override fun completeResumeReceive(token: Any) {
864 if (token is IdempotentTokenValue<*>) {
865 iterator.result = token.value
866 cont.completeResume(token.token)
867 } else
868 cont.completeResume(token)
869 }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300870
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300871 override fun resumeReceiveClosed(closed: Closed<*>) {
872 val token = if (closed.closeCause == null)
873 cont.tryResume(false)
874 else
875 cont.tryResumeWithException(closed.receiveException)
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300876 if (token != null) {
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300877 iterator.result = closed
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300878 cont.completeResume(token)
879 }
880 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300881 override fun toString(): String = "ReceiveHasNext[$cont]"
882 }
883
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300884 private inner class ReceiveSelect<R, in E>(
Roman Elizarov1216e912017-02-22 09:57:06 +0300885 @JvmField val select: SelectInstance<R>,
886 @JvmField val block: suspend (E?) -> R,
887 @JvmField val nullOnClose: Boolean
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300888 ) : Receive<E>(), DisposableHandle {
Roman Elizarov1216e912017-02-22 09:57:06 +0300889 override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
890 if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
891
892 @Suppress("UNCHECKED_CAST")
893 override fun completeResumeReceive(token: Any) {
894 val value: E = (if (token === NULL_VALUE) null else token) as E
895 block.startCoroutine(value, select.completion)
896 }
897
898 override fun resumeReceiveClosed(closed: Closed<*>) {
Roman Elizarov932e8602017-06-21 17:21:37 +0300899 if (select.trySelect(null)) {
Roman Elizarov1216e912017-02-22 09:57:06 +0300900 if (closed.closeCause == null && nullOnClose) {
901 block.startCoroutine(null, select.completion)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300902 } else {
Roman Elizarov932e8602017-06-21 17:21:37 +0300903 // even though we are dispatching coroutine to process channel close on receive,
904 // which is an atomically cancellable suspending function,
905 // close is a final state, so we can use a cancellable resume mode
906 select.resumeSelectCancellableWithException(closed.receiveException)
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300907 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300908 }
909 }
910
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300911 fun removeOnSelectCompletion() {
912 select.disposeOnSelect(this)
913 }
Roman Elizarov1216e912017-02-22 09:57:06 +0300914
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300915 override fun dispose() { // invoked on select completion
Roman Elizarov2ad0e942017-02-28 19:14:08 +0300916 if (remove())
Roman Elizarov3e342e32018-01-13 20:05:51 +0300917 onReceiveDequeued() // notify cancellation of receive
Roman Elizarov1216e912017-02-22 09:57:06 +0300918 }
919
920 override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300921 }
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300922
923 private class IdempotentTokenValue<out E>(
924 @JvmField val token: Any,
925 @JvmField val value: E
926 )
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300927}
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300928
929/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300930@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300931
932/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300933@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300934
935/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300936@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300937
938/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300939@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300940
941/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300942@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300943
944/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300945@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300946
947/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300948@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300949
950/** @suppress **This is unstable API and it is subject to change.** */
Roman Elizarov11d6b5b2018-04-26 10:11:50 +0300951@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300952
953/**
954 * Represents sending waiter in the queue.
955 * @suppress **This is unstable API and it is subject to change.**
956 */
957public interface Send {
958 val pollResult: Any? // E | Closed
959 fun tryResumeSend(idempotent: Any?): Any?
960 fun completeResumeSend(token: Any)
Roman Elizarovb555d912017-08-17 21:01:33 +0300961 fun resumeSendClosed(closed: Closed<*>)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300962}
963
964/**
965 * Represents receiver waiter in the queue or closed token.
966 * @suppress **This is unstable API and it is subject to change.**
967 */
968public interface ReceiveOrClosed<in E> {
969 val offerResult: Any // OFFER_SUCCESS | Closed
970 fun tryResumeReceive(value: E, idempotent: Any?): Any?
971 fun completeResumeReceive(token: Any)
972}
973
974/**
Roman Elizarovb555d912017-08-17 21:01:33 +0300975 * Represents sender for a specific element.
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300976 * @suppress **This is unstable API and it is subject to change.**
977 */
978@Suppress("UNCHECKED_CAST")
979public class SendElement(
980 override val pollResult: Any?,
981 @JvmField val cont: CancellableContinuation<Unit>
982) : LockFreeLinkedListNode(), Send {
983 override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
984 override fun completeResumeSend(token: Any) = cont.completeResume(token)
Roman Elizarovb555d912017-08-17 21:01:33 +0300985 override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
Roman Elizarove3aa8ff2017-04-27 19:16:40 +0300986 override fun toString(): String = "SendElement($pollResult)[$cont]"
987}
988
989/**
990 * Represents closed channel.
991 * @suppress **This is unstable API and it is subject to change.**
992 */
993public class Closed<in E>(
994 @JvmField val closeCause: Throwable?
995) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
996 val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
997 val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
998
999 override val offerResult get() = this
1000 override val pollResult get() = this
1001 override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED
1002 override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarove6e8ce82017-06-05 17:04:39 +03001003 override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
1004 override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
Roman Elizarovb555d912017-08-17 21:01:33 +03001005 override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
Roman Elizarove3aa8ff2017-04-27 19:16:40 +03001006 override fun toString(): String = "Closed[$closeCause]"
1007}
1008
1009private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1010 override val offerResult get() = OFFER_SUCCESS
1011 abstract fun resumeReceiveClosed(closed: Closed<*>)
1012}
1013