Fixed linearizability of Channel.close operation

Send operations must ALWAYS help close the channel when they observe
that it was closed before throwing an exception.

Fixes #1419
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 7b8f96b..a3be3ba 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -160,14 +160,24 @@
         val result = offerInternal(element)
         return when {
             result === OFFER_SUCCESS -> true
-            // We should check for closed token on offer as well, otherwise offer won't be linearizable
-            // in the face of concurrent close()
-            result === OFFER_FAILED -> throw closedForSend?.sendException?.let { recoverStackTrace(it) } ?: return false
-            result is Closed<*> -> throw recoverStackTrace(result.sendException)
+            result === OFFER_FAILED -> {
+                // We should check for closed token on offer as well, otherwise offer won't be linearizable
+                // in the face of concurrent close()
+                // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
+                throw recoverStackTrace(helpCloseAndGetSendException(closedForSend ?: return false))
+            }
+            result is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(result))
             else -> error("offerInternal returned $result")
         }
     }
 
+    private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
+        // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
+        // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
+        helpClose(closed)
+        return closed.sendException
+    }
+
     private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
         loop@ while (true) {
             if (full) {
@@ -179,8 +189,7 @@
                         return@sc
                     }
                     enqueueResult is Closed<*> -> {
-                        helpClose(enqueueResult)
-                        cont.resumeWithException(enqueueResult.sendException)
+                        cont.helpCloseAndResumeWithSendException(enqueueResult)
                         return@sc
                     }
                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
@@ -197,8 +206,7 @@
                 }
                 offerResult === OFFER_FAILED -> continue@loop
                 offerResult is Closed<*> -> {
-                    helpClose(offerResult)
-                    cont.resumeWithException(offerResult.sendException)
+                    cont.helpCloseAndResumeWithSendException(offerResult)
                     return@sc
                 }
                 else -> error("offerInternal returned $offerResult")
@@ -206,6 +214,11 @@
         }
     }
 
+    private fun Continuation<*>.helpCloseAndResumeWithSendException(closed: Closed<*>) {
+        helpClose(closed)
+        resumeWithException(closed.sendException)
+    }
+
     /**
      * Result is:
      * * null -- successfully enqueued
@@ -230,23 +243,17 @@
 
     public override fun close(cause: Throwable?): Boolean {
         val closed = Closed<E>(cause)
-
         /*
          * Try to commit close by adding a close token to the end of the queue.
          * Successful -> we're now responsible for closing receivers
          * Not successful -> help closing pending receivers to maintain invariant
          * "if (!close()) next send will throw"
          */
-        val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
-        if (!closeAdded) {
-            val actualClosed = queue.prevNode as Closed<*>
-            helpClose(actualClosed)
-            return false
-        }
-
-        helpClose(closed)
-        invokeOnCloseHandler(cause)
-        return true
+        val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
+        val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
+        helpClose(actuallyClosed)
+        if (closeAdded) invokeOnCloseHandler(cause)
+        return closeAdded // true if we have closed
     }
 
     private fun invokeOnCloseHandler(cause: Throwable?) {
@@ -370,10 +377,7 @@
                         select.disposeOnSelect(node)
                         return
                     }
-                    enqueueResult is Closed<*> -> {
-                        helpClose(enqueueResult)
-                        throw recoverStackTrace(enqueueResult.sendException)
-                    }
+                    enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer
                     enqueueResult is Receive<*> -> {} // try to offer
                     else -> error("enqueueSend returned $enqueueResult ")
@@ -388,10 +392,7 @@
                     block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
                     return
                 }
-                offerResult is Closed<*> -> {
-                    helpClose(offerResult)
-                    throw recoverStackTrace(offerResult.sendException)
-                }
+                offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(offerResult))
                 else -> error("offerSelectInternal returned $offerResult")
             }
         }
@@ -432,7 +433,7 @@
 
     private class SendSelect<E, R>(
         override val pollResult: Any?,
-        @JvmField val channel: SendChannel<E>,
+        @JvmField val channel: AbstractSendChannel<E>,
         @JvmField val select: SelectInstance<R>,
         @JvmField val block: suspend (SendChannel<E>) -> R
     ) : Send(), DisposableHandle {
diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
index 3afc86c..2a73930 100644
--- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
 package kotlinx.coroutines.channels
@@ -29,8 +29,7 @@
             when {
                 result === OFFER_SUCCESS -> return OFFER_SUCCESS
                 result === OFFER_FAILED -> { // try to buffer
-                    val sendResult = sendBuffered(element)
-                    when (sendResult) {
+                    when (val sendResult = sendBuffered(element)) {
                         null -> return OFFER_SUCCESS
                         is Closed<*> -> return sendResult
                     }
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt
new file mode 100644
index 0000000..5bdc284
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/linearizability/ChannelCloseLCStressTest.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("unused")
+
+package kotlinx.coroutines.linearizability
+
+import com.devexperts.dxlab.lincheck.*
+import com.devexperts.dxlab.lincheck.annotations.*
+import com.devexperts.dxlab.lincheck.paramgen.*
+import com.devexperts.dxlab.lincheck.strategy.stress.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import org.junit.*
+import java.io.*
+
+/**
+ * This is stress test that is fine-tuned to catch the problem
+ * [#1419](https://github.com/Kotlin/kotlinx.coroutines/issues/1419)
+ */
+@Param(name = "value", gen = IntGen::class, conf = "2:2")
+@OpGroupConfig.OpGroupConfigs(
+    OpGroupConfig(name = "send", nonParallel = true),
+    OpGroupConfig(name = "receive", nonParallel = true),
+    OpGroupConfig(name = "close", nonParallel = true)
+)
+class ChannelCloseLCStressTest : TestBase() {
+
+    private companion object {
+        // Emulating ctor argument for lincheck
+        var capacity = 0
+    }
+
+    private val lt = LinTesting()
+    private var channel: Channel<Int> = Channel(capacity)
+
+    @Operation(runOnce = true, group = "send")
+    fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) }
+
+    @Operation(runOnce = true, group = "send")
+    fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) }
+
+    @Operation(runOnce = true, group = "receive")
+    fun receive1() = lt.run("receive1") { channel.receive() }
+
+    @Operation(runOnce = true, group = "receive")
+    fun receive2() = lt.run("receive2") { channel.receive() }
+
+    @Operation(runOnce = true, group = "close")
+    fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
+
+    @Operation(runOnce = true, group = "close")
+    fun close2() = lt.run("close2") { channel.close(IOException("close2")) }
+
+    @Test
+    fun testRendezvousChannelLinearizability() {
+        runTest(0)
+    }
+
+    @Test
+    fun testArrayChannelLinearizability() {
+        for (i in listOf(1, 2, 16)) {
+            runTest(i)
+        }
+    }
+
+    @Test
+    fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED)
+
+    @Test
+    fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED)
+
+    private fun runTest(capacity: Int) {
+        ChannelCloseLCStressTest.capacity = capacity
+        val options = StressOptions()
+            .iterations(1) // only one iteration -- test scenario is fixed
+            .invocationsPerIteration(10_000 * stressTestMultiplierSqrt)
+            .threads(3)
+            .verifier(LinVerifier::class.java)
+        LinChecker.check(ChannelCloseLCStressTest::class.java, options)
+    }
+}