Fix race condition in pair select

This bug was introduced by #1524. The crux of problem is that
TryOffer/PollDesc.onPrepare method is no longer allowed to update
fields in these classes (like "resumeToken" and "pollResult") after call
to tryResumeSend/Receive method, because the latter will complete
the ongoing atomic operation and helper method might find it complete
and try reading "resumeToken" which was not initialized yet.

This change removes "pollResult" field which was not really needed
("result.pollResult" field is used) and removes "resumeToken" by
exploiting the fact that current implementation of
CancellableContinuationImpl does not need a token anymore. However,
CancellableContinuation.tryResume/completeResume ABI is left intact,
because it is used by 3rd party code.

This fix lead to overall simplification of the code. A number of fields
and an auxiliary IdempotentTokenValue class are removed, tokens used to
indicate various results are consolidated, so that resume success
is now consistently indicated by a single RESUME_TOKEN symbol.

Fixes #1561
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index 9a19ed6..559afb8 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -14,6 +14,10 @@
 private const val SUSPENDED = 1
 private const val RESUMED = 2
 
+@JvmField
+@SharedImmutable
+internal val RESUME_TOKEN = Symbol("RESUME_TOKEN")
+
 /**
  * @suppress **This is unstable API and it is subject to change.**
  */
@@ -347,20 +351,21 @@
         parentHandle = NonDisposableHandle
     }
 
+    // Note: Always returns RESUME_TOKEN | null
     override fun tryResume(value: T, idempotent: Any?): Any? {
         _state.loop { state ->
             when (state) {
                 is NotCompleted -> {
                     val update: Any? = if (idempotent == null) value else
-                        CompletedIdempotentResult(idempotent, value, state)
+                        CompletedIdempotentResult(idempotent, value)
                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
                     detachChildIfNonResuable()
-                    return state
+                    return RESUME_TOKEN
                 }
                 is CompletedIdempotentResult -> {
                     return if (state.idempotentResume === idempotent) {
                         assert { state.result === value } // "Non-idempotent resume"
-                        state.token
+                        RESUME_TOKEN
                     } else {
                         null
                     }
@@ -377,15 +382,16 @@
                     val update = CompletedExceptionally(exception)
                     if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
                     detachChildIfNonResuable()
-                    return state
+                    return RESUME_TOKEN
                 }
                 else -> return null // cannot resume -- not active anymore
             }
         }
     }
 
+    // note: token is always RESUME_TOKEN
     override fun completeResume(token: Any) {
-        // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only
+        assert { token === RESUME_TOKEN }
         dispatchResume(resumeMode)
     }
 
@@ -437,8 +443,7 @@
 
 private class CompletedIdempotentResult(
     @JvmField val idempotentResume: Any?,
-    @JvmField val result: Any?,
-    @JvmField val token: NotCompleted
+    @JvmField val result: Any?
 ) {
     override fun toString(): String = "CompletedIdempotentResult[$result]"
 }
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 22c1971..9f672af 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -48,7 +48,8 @@
             val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
             val token = receive.tryResumeReceive(element, null)
             if (token != null) {
-                receive.completeResumeReceive(token)
+                assert { token === RESUME_TOKEN }
+                receive.completeResumeReceive(element)
                 return receive.offerResult
             }
         }
@@ -65,7 +66,7 @@
         val failure = select.performAtomicTrySelect(offerOp)
         if (failure != null) return failure
         val receive = offerOp.result
-        receive.completeResumeReceive(offerOp.resumeToken!!)
+        receive.completeResumeReceive(element)
         return receive.offerResult
     }
 
@@ -354,8 +355,6 @@
         @JvmField val element: E,
         queue: LockFreeLinkedListHead
     ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
-        @JvmField var resumeToken: Any? = null
-
         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
             is Closed<*> -> affected
             !is ReceiveOrClosed<*> -> OFFER_FAILED
@@ -367,7 +366,7 @@
             val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
             val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
-            resumeToken = token
+            assert { token === RESUME_TOKEN }
             return null
         }
     }
@@ -454,8 +453,7 @@
         override fun tryResumeSend(otherOp: PrepareOp?): Any? =
             select.trySelectOther(otherOp)
 
-        override fun completeResumeSend(token: Any) {
-            assert { token === SELECT_STARTED }
+        override fun completeResumeSend() {
             block.startCoroutine(receiver = channel, completion = select.completion)
         }
 
@@ -475,8 +473,8 @@
         @JvmField val element: E
     ) : Send() {
         override val pollResult: Any? get() = element
-        override fun tryResumeSend(otherOp: PrepareOp?): Any?  = SEND_RESUMED.also { otherOp?.finishPrepare() }
-        override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
+        override fun tryResumeSend(otherOp: PrepareOp?): Any?  = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+        override fun completeResumeSend() {}
         override fun resumeSendClosed(closed: Closed<*>) {}
     }
 }
@@ -511,7 +509,8 @@
             val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
             val token = send.tryResumeSend(null)
             if (token != null) {
-                send.completeResumeSend(token)
+                assert { token === RESUME_TOKEN }
+                send.completeResumeSend()
                 return send.pollResult
             }
         }
@@ -528,8 +527,8 @@
         val failure = select.performAtomicTrySelect(pollOp)
         if (failure != null) return failure
         val send = pollOp.result
-        send.completeResumeSend(pollOp.resumeToken!!)
-        return pollOp.pollResult
+        send.completeResumeSend()
+        return pollOp.result.pollResult
     }
 
     // ------ state functions & helpers for concrete implementations ------
@@ -673,9 +672,6 @@
      * @suppress **This is unstable API and it is subject to change.**
      */
     protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
-        @JvmField var resumeToken: Any? = null
-        @JvmField var pollResult: E? = null
-
         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
             is Closed<*> -> affected
             !is Send -> POLL_FAILED
@@ -687,8 +683,7 @@
             val affected = prepareOp.affected as Send // see "failure" impl
             val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
-            resumeToken = token
-            pollResult = affected.pollResult as E
+            assert { token === RESUME_TOKEN }
             return null
         }
     }
@@ -908,7 +903,8 @@
             return cont.tryResume(resumeValue(value), otherOp?.desc)
         }
 
-        override fun completeResumeReceive(token: Any) = cont.completeResume(token)
+        override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
+
         override fun resumeReceiveClosed(closed: Closed<*>) {
             when {
                 receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
@@ -925,25 +921,16 @@
     ) : Receive<E>() {
         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
             otherOp?.finishPrepare()
-            val token = cont.tryResume(true, otherOp?.desc)
-            if (token != null) {
-                /*
-                   When otherOp != null this invocation can be stale and we cannot directly update iterator.result
-                   Instead, we save both token & result into a temporary IdempotentTokenValue object and
-                   set iterator result only in completeResumeReceive that is going to be invoked just once
-                 */
-                if (otherOp != null) return IdempotentTokenValue(token, value)
-                iterator.result = value
-            }
-            return token
+            return cont.tryResume(true, otherOp?.desc)
         }
 
-        override fun completeResumeReceive(token: Any) {
-            if (token is IdempotentTokenValue<*>) {
-                iterator.result = token.value
-                cont.completeResume(token.token)
-            } else
-                cont.completeResume(token)
+        override fun completeResumeReceive(value: E) {
+            /*
+               When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
+               but completeResumeReceive is called once so we set iterator result here.
+             */
+            iterator.result = value
+            cont.completeResume(RESUME_TOKEN)
         }
 
         override fun resumeReceiveClosed(closed: Closed<*>) {
@@ -966,14 +953,11 @@
         @JvmField val block: suspend (Any?) -> R,
         @JvmField val receiveMode: Int
     ) : Receive<E>(), DisposableHandle {
-        override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? {
-            val result = select.trySelectOther(otherOp)
-            return if (result === SELECT_STARTED) value ?: NULL_VALUE else result
-        }
+        override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? =
+            select.trySelectOther(otherOp)
 
         @Suppress("UNCHECKED_CAST")
-        override fun completeResumeReceive(token: Any) {
-            val value: E = NULL_VALUE.unbox<E>(token)
+        override fun completeResumeReceive(value: E) {
             block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
         }
 
@@ -997,11 +981,6 @@
 
         override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
     }
-
-    private class IdempotentTokenValue<out E>(
-        @JvmField val token: Any,
-        @JvmField val value: E
-    )
 }
 
 // receiveMode values
@@ -1027,18 +1006,6 @@
 
 @JvmField
 @SharedImmutable
-internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE")
-
-@JvmField
-@SharedImmutable
-internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
-
-@JvmField
-@SharedImmutable
-internal val SEND_RESUMED: Any = Symbol("SEND_RESUMED")
-
-@JvmField
-@SharedImmutable
 internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED")
 
 internal typealias Handler = (Throwable?) -> Unit
@@ -1050,10 +1017,10 @@
     abstract val pollResult: Any? // E | Closed
     // Returns: null - failure,
     //          RETRY_ATOMIC for retry (only when otherOp != null),
-    //          otherwise token for completeResumeSend
+    //          RESUME_TOKEN on success (call completeResumeSend)
     // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
     abstract fun tryResumeSend(otherOp: PrepareOp?): Any?
-    abstract fun completeResumeSend(token: Any)
+    abstract fun completeResumeSend()
     abstract fun resumeSendClosed(closed: Closed<*>)
 }
 
@@ -1064,10 +1031,10 @@
     val offerResult: Any // OFFER_SUCCESS | Closed
     // Returns: null - failure,
     //          RETRY_ATOMIC for retry (only when otherOp != null),
-    //          otherwise token for completeResumeReceive
+    //          RESUME_TOKEN on success (call completeResumeReceive)
     // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
     fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any?
-    fun completeResumeReceive(token: Any)
+    fun completeResumeReceive(value: E)
 }
 
 /**
@@ -1082,7 +1049,7 @@
         otherOp?.finishPrepare()
         return cont.tryResume(Unit, otherOp?.desc)
     }
-    override fun completeResumeSend(token: Any) = cont.completeResume(token)
+    override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
     override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
     override fun toString(): String = "SendElement($pollResult)"
 }
@@ -1098,10 +1065,10 @@
 
     override val offerResult get() = this
     override val pollResult get() = this
-    override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
-    override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } }
-    override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() }
-    override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } }
+    override fun tryResumeSend(otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+    override fun completeResumeSend() {}
+    override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+    override fun completeResumeReceive(value: E) {}
     override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
     override fun toString(): String = "Closed[$closeCause]"
 }
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
index c9fb3cc..5f3eb32 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt
@@ -143,7 +143,6 @@
     private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
         // update head in a tail rec loop
         var send: Send? = null
-        var token: Any? = null
         bufferLock.withLock {
             if (addSub != null) {
                 addSub.subHead = tail // start from last element
@@ -172,8 +171,9 @@
                     while (true) {
                         send = takeFirstSendOrPeekClosed() ?: break // when when no sender
                         if (send is Closed<*>) break // break when closed for send
-                        token = send!!.tryResumeSend(null)
+                        val token = send!!.tryResumeSend(null)
                         if (token != null) {
+                            assert { token === RESUME_TOKEN }
                             // put sent element to the buffer
                             buffer[(tail % capacity).toInt()] = (send as Send).pollResult
                             this.size = size + 1
@@ -186,7 +186,7 @@
             return // done updating here -> return
         }
         // we only get out of the lock normally when there is a sender to resume
-        send!!.completeResumeSend(token!!)
+        send!!.completeResumeSend()
         // since we've just sent an element, we might need to resume some receivers
         checkSubOffers()
         // tailrec call to recheck
@@ -239,9 +239,9 @@
                 // it means that `checkOffer` must be retried after every `unlock`
                 if (!subLock.tryLock()) break
                 val receive: ReceiveOrClosed<E>?
-                val token: Any?
+                var result: Any?
                 try {
-                    val result = peekUnderLock()
+                    result = peekUnderLock()
                     when {
                         result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock
                         result is Closed<*> -> {
@@ -252,15 +252,15 @@
                     // find a receiver for an element
                     receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving
                     if (receive is Closed<*>) break // noting more to do if this sub already closed
-                    token = receive.tryResumeReceive(result as E, null)
-                    if (token == null) continue // bail out here to next iteration (see for next receiver)
+                    val token = receive.tryResumeReceive(result as E, null) ?: continue
+                    assert { token === RESUME_TOKEN }
                     val subHead = this.subHead
                     this.subHead = subHead + 1 // retrieved element for this subscriber
                     updated = true
                 } finally {
                     subLock.unlock()
                 }
-                receive!!.completeResumeReceive(token!!)
+                receive!!.completeResumeReceive(result as E)
             }
             // do close outside of lock if needed
             closed?.also { close(cause = it.closeCause) }
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 0b850b2..f10713d 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -50,7 +50,6 @@
     // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
     protected override fun offerInternal(element: E): Any {
         var receive: ReceiveOrClosed<E>? = null
-        var token: Any? = null
         lock.withLock {
             val size = this.size
             closedForSend?.let { return it }
@@ -65,8 +64,9 @@
                             this.size = size // restore size
                             return receive!!
                         }
-                        token = receive!!.tryResumeReceive(element, null)
+                        val token = receive!!.tryResumeReceive(element, null)
                         if (token != null) {
+                            assert { token === RESUME_TOKEN }
                             this.size = size // restore size
                             return@withLock
                         }
@@ -80,14 +80,13 @@
             return OFFER_FAILED
         }
         // breaks here if offer meets receiver
-        receive!!.completeResumeReceive(token!!)
+        receive!!.completeResumeReceive(element)
         return receive!!.offerResult
     }
 
     // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
         var receive: ReceiveOrClosed<E>? = null
-        var token: Any? = null
         lock.withLock {
             val size = this.size
             closedForSend?.let { return it }
@@ -103,8 +102,6 @@
                             failure == null -> { // offered successfully
                                 this.size = size // restore size
                                 receive = offerOp.result
-                                token = offerOp.resumeToken
-                                assert { token != null }
                                 return@withLock
                             }
                             failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
@@ -130,7 +127,7 @@
             return OFFER_FAILED
         }
         // breaks here if offer meets receiver
-        receive!!.completeResumeReceive(token!!)
+        receive!!.completeResumeReceive(element)
         return receive!!.offerResult
     }
 
@@ -150,7 +147,7 @@
     // result is `E | POLL_FAILED | Closed`
     protected override fun pollInternal(): Any? {
         var send: Send? = null
-        var token: Any? = null
+        var resumed = false
         var result: Any? = null
         lock.withLock {
             val size = this.size
@@ -164,8 +161,10 @@
             if (size == capacity) {
                 loop@ while (true) {
                     send = takeFirstSendOrPeekClosed() ?: break
-                    token = send!!.tryResumeSend(null)
+                    val token = send!!.tryResumeSend(null)
                     if (token != null) {
+                        assert { token === RESUME_TOKEN }
+                        resumed = true
                         replacement = send!!.pollResult
                         break@loop
                     }
@@ -178,15 +177,15 @@
             head = (head + 1) % buffer.size
         }
         // complete send the we're taken replacement from
-        if (token != null)
-            send!!.completeResumeSend(token!!)
+        if (resumed)
+            send!!.completeResumeSend()
         return result
     }
 
     // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
     protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
         var send: Send? = null
-        var token: Any? = null
+        var success = false
         var result: Any? = null
         lock.withLock {
             val size = this.size
@@ -204,8 +203,7 @@
                     when {
                         failure == null -> { // polled successfully
                             send = pollOp.result
-                            token = pollOp.resumeToken
-                            assert { token != null }
+                            success = true
                             replacement = send!!.pollResult
                             break@loop
                         }
@@ -218,7 +216,7 @@
                         }
                         failure is Closed<*> -> {
                             send = failure
-                            token = failure.tryResumeSend(null)
+                            success = true
                             replacement = failure
                             break@loop
                         }
@@ -240,8 +238,8 @@
             head = (head + 1) % buffer.size
         }
         // complete send the we're taken replacement from
-        if (token != null)
-            send!!.completeResumeSend(token!!)
+        if (success)
+            send!!.completeResumeSend()
         return result
     }
 
diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt
index 5318d14..550ea1f 100644
--- a/kotlinx-coroutines-core/common/src/selects/Select.kt
+++ b/kotlinx-coroutines-core/common/src/selects/Select.kt
@@ -87,10 +87,6 @@
     public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
 }
 
-@JvmField
-@SharedImmutable
-internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
-
 /**
  * Internal representation of select instance. This instance is called _selected_ when
  * the clause to execute is already picked.
@@ -111,7 +107,7 @@
 
     /**
      * Tries to select this instance. Returns:
-     * * [SELECT_STARTED] on success,
+     * * [RESUME_TOKEN] on success,
      * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`)
      * * `null` on failure to select (already selected).
      * [otherOp] is not null when trying to rendezvous with this select from inside of another select.
@@ -370,7 +366,7 @@
     override fun trySelect(): Boolean {
         val result = trySelectOther(null)
         return when {
-            result === SELECT_STARTED -> true
+            result === RESUME_TOKEN -> true
             result == null -> false
             else -> error("Unexpected trySelectIdempotent result $result")
         }
@@ -460,7 +456,7 @@
      */
 
     // it is just like plain trySelect, but support idempotent start
-    // Returns SELECT_STARTED | RETRY_ATOMIC | null (when already selected)
+    // Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected)
     override fun trySelectOther(otherOp: PrepareOp?): Any? {
         _state.loop { state -> // lock-free loop on state
             when {
@@ -477,7 +473,7 @@
                         if (decision !== null) return decision
                     }
                     doAfterSelect()
-                    return SELECT_STARTED
+                    return RESUME_TOKEN
                 }
                 state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp
                     // Found descriptor of ongoing operation while working in the context of other select operation
@@ -512,7 +508,7 @@
                 }
                 // otherwise -- already selected
                 otherOp == null -> return null // already selected
-                state === otherOp.desc -> return SELECT_STARTED // was selected with this marker
+                state === otherOp.desc -> return RESUME_TOKEN // was selected with this marker
                 else -> return null // selected with different marker
             }
         }