Fix of a bug in the semaphore (#1477)

* Add a failing test for a semaphore

This test consistently fails for the current implementation. It
attempts to cause the following state: after `job2` increments
`availablePermits` but before it wakes up the acquirer in the
queue, the acquirer is cancelled. Then, regardless of whether
`RESUMED` or `CANCELLED` was written first, another cell in the
queue is marked to be resumed. However, this is incorrect: on
cancellation, the acquirer incremented the number of available
permits once more, making it `1`; thus, at the same time there
exist two permits for acquiring the mutex. At the next loop
iteration, a new acquirer tries to claim ownership of the mutex and
succeeds because it goes to the thread queue and sees its cell as
`RESUMED`. Thus, two entities own a mutex at the same time.

* Fix a bug in semaphore implementation

The fix works as follows: if `availablePermits` is negative, its
absolute value denotes the logical length of the thread queue.
Increasing its value if it was negative means that this thread
promises to wake exactly one thread, and if its positive, returns
one permit to the semaphore itself.

Before, the error was in that a queue could be of negative length:
if it consisted of only `N` cells, and `N` resume queries arrived,
cancelling any threads would mean that there are more wakers then
there are sleepers, which breaks the invariants of the semaphore.
Thus, if on cancellation the acquirer detects that it leaves the
queue empty in the presence of resumers, it simply transfers the
semaphore acquisition permit to the semaphore itself, because it
knows that it, in a sense, owns it already: there is a thread that
is bound to resume this cell.
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
index 6ab377d..a9df15c 100644
--- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -166,7 +166,8 @@
     private val index: Int
 ) : CancelHandler() {
     override fun invoke(cause: Throwable?) {
-        semaphore.incPermits()
+        val p = semaphore.incPermits()
+        if (p >= 0) return
         if (segment.cancel(index)) return
         semaphore.resumeNextFromQueue()
     }
diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
index cdfcc6b..e872b52 100644
--- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
@@ -2,6 +2,7 @@
 
 import kotlinx.coroutines.*
 import org.junit.Test
+import org.junit.After
 import kotlin.test.assertEquals
 
 class SemaphoreStressTest : TestBase() {
@@ -57,4 +58,39 @@
         semaphore.release()
         assertEquals(1, semaphore.availablePermits)
     }
-}
\ No newline at end of file
+
+    /**
+     * This checks if repeated releases that race with cancellations put
+     * the semaphore into an incorrect state where permits are leaked.
+     */
+    @Test
+    fun stressReleaseCancelRace() = runTest {
+        val n = 10_000 * stressTestMultiplier
+        val semaphore = Semaphore(1, 1)
+        newSingleThreadContext("SemaphoreStressTest").use { pool ->
+            repeat (n) {
+                // Initially, we hold the permit and no one else can `acquire`,
+                // otherwise it's a bug.
+                assertEquals(0, semaphore.availablePermits)
+                var job1_entered_critical_section = false
+                val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
+                    semaphore.acquire()
+                    job1_entered_critical_section = true
+                    semaphore.release()
+                }
+                // check that `job1` didn't finish the call to `acquire()`
+                assertEquals(false, job1_entered_critical_section)
+                val job2 = launch(pool) {
+                    semaphore.release()
+                }
+                // Because `job2` executes in a separate thread, this
+                // cancellation races with the call to `release()`.
+                job1.cancelAndJoin()
+                job2.join()
+                assertEquals(1, semaphore.availablePermits)
+                semaphore.acquire()
+            }
+        }
+    }
+
+}