Add fast `Semaphore`.

In addition, the `SegmentQueue` data structure, which emulates an infinite array with fast removing from the middle, is introduced for storing suspended acquirers in semaphore/mutex/channel algorithms.

Fixes #1088
diff --git a/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt b/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt
new file mode 100644
index 0000000..4ad554f
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt
@@ -0,0 +1,176 @@
+package kotlinx.coroutines.internal
+
+import kotlinx.atomicfu.AtomicRef
+import kotlinx.atomicfu.atomic
+import kotlinx.atomicfu.loop
+
+/**
+ * Essentially, this segment queue is an infinite array of segments, which is represented as
+ * a Michael-Scott queue of them. All segments are instances of [Segment] class and
+ * follow in natural order (see [Segment.id]) in the queue.
+ */
+internal abstract class SegmentQueue<S: Segment<S>>() {
+    private val _head: AtomicRef<S>
+    /**
+     * Returns the first segment in the queue.
+     */
+    protected val head: S get() = _head.value
+
+    private val _tail: AtomicRef<S>
+    /**
+     * Returns the last segment in the queue.
+     */
+    protected val tail: S get() = _tail.value
+
+    init {
+        val initialSegment = newSegment(0)
+        _head = atomic(initialSegment)
+        _tail = atomic(initialSegment)
+    }
+
+    /**
+     * The implementation should create an instance of segment [S] with the specified id
+     * and initial reference to the previous one.
+     */
+    abstract fun newSegment(id: Long, prev: S? = null): S
+
+    /**
+     * Finds a segment with the specified [id] following by next references from the
+     * [startFrom] segment. The typical use-case is reading [tail] or [head], doing some
+     * synchronization, and invoking [getSegment] or [getSegmentAndMoveHead] correspondingly
+     * to find the required segment.
+     */
+    protected fun getSegment(startFrom: S, id: Long): S? {
+        // Go through `next` references and add new segments if needed,
+        // similarly to the `push` in the Michael-Scott queue algorithm.
+        // The only difference is that `CAS failure` means that the
+        // required segment has already been added, so the algorithm just
+        // uses it. This way, only one segment with each id can be in the queue.
+        var cur: S = startFrom
+        while (cur.id < id) {
+            var curNext = cur.next
+            if (curNext == null) {
+                // Add a new segment.
+                val newTail = newSegment(cur.id + 1, cur)
+                curNext = if (cur.casNext(null, newTail)) {
+                    if (cur.removed) {
+                        cur.remove()
+                    }
+                    moveTailForward(newTail)
+                    newTail
+                } else {
+                    cur.next!!
+                }
+            }
+            cur = curNext
+        }
+        if (cur.id != id) return null
+        return cur
+    }
+
+    /**
+     * Invokes [getSegment] and replaces [head] with the result if its [id] is greater.
+     */
+    protected fun getSegmentAndMoveHead(startFrom: S, id: Long): S? {
+        @Suppress("LeakingThis")
+        if (startFrom.id == id) return startFrom
+        val s = getSegment(startFrom, id) ?: return null
+        moveHeadForward(s)
+        return s
+    }
+
+    /**
+     * Updates [head] to the specified segment
+     * if its `id` is greater.
+     */
+    private fun moveHeadForward(new: S) {
+        _head.loop { curHead ->
+            if (curHead.id > new.id) return
+            if (_head.compareAndSet(curHead, new)) {
+                new.prev.value = null
+                return
+            }
+        }
+    }
+
+    /**
+     * Updates [tail] to the specified segment
+     * if its `id` is greater.
+     */
+    private fun moveTailForward(new: S) {
+        _tail.loop { curTail ->
+            if (curTail.id > new.id) return
+            if (_tail.compareAndSet(curTail, new)) return
+        }
+    }
+}
+
+/**
+ * Each segment in [SegmentQueue] has a unique id and is created by [SegmentQueue.newSegment].
+ * Essentially, this is a node in the Michael-Scott queue algorithm, but with
+ * maintaining [prev] pointer for efficient [remove] implementation.
+ */
+internal abstract class Segment<S: Segment<S>>(val id: Long, prev: S?) {
+    // Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm.
+    private val _next = atomic<S?>(null)
+    val next: S? get() = _next.value
+    fun casNext(expected: S?, value: S?): Boolean = _next.compareAndSet(expected, value)
+    // Pointer to the previous segment, updates in [remove] function.
+    val prev = atomic<S?>(null)
+
+    /**
+     * Returns `true` if this segment is logically removed from the queue.
+     * The [remove] function should be called right after it becomes logically removed.
+     */
+    abstract val removed: Boolean
+
+    init {
+        this.prev.value = prev
+    }
+
+    /**
+     * Removes this segment physically from the segment queue. The segment should be
+     * logically removed (so [removed] returns `true`) at the point of invocation.
+     */
+    fun remove() {
+        check(removed) { " The segment should be logically removed at first "}
+        // Read `next` and `prev` pointers.
+        var next = this._next.value ?: return // tail cannot be removed
+        var prev = prev.value ?: return // head cannot be removed
+        // Link `next` and `prev`.
+        prev.moveNextToRight(next)
+        while (prev.removed) {
+            prev = prev.prev.value ?: break
+            prev.moveNextToRight(next)
+        }
+        next.movePrevToLeft(prev)
+        while (next.removed) {
+            next = next.next ?: break
+            next.movePrevToLeft(prev)
+        }
+    }
+
+    /**
+     * Updates [next] pointer to the specified segment if
+     * the [id] of the specified segment is greater.
+     */
+    private fun moveNextToRight(next: S) {
+        while (true) {
+            val curNext = this._next.value as S
+            if (next.id <= curNext.id) return
+            if (this._next.compareAndSet(curNext, next)) return
+        }
+    }
+
+    /**
+     * Updates [prev] pointer to the specified segment if
+     * the [id] of the specified segment is lower.
+     */
+    private fun movePrevToLeft(prev: S) {
+        while (true) {
+            val curPrev = this.prev.value ?: return
+            if (curPrev.id <= prev.id) return
+            if (this.prev.compareAndSet(curPrev, prev)) return
+        }
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
new file mode 100644
index 0000000..0ffb990
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -0,0 +1,213 @@
+package kotlinx.coroutines.sync
+
+import kotlinx.atomicfu.atomic
+import kotlinx.atomicfu.atomicArrayOfNulls
+import kotlinx.atomicfu.getAndUpdate
+import kotlinx.atomicfu.loop
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import kotlin.coroutines.resume
+import kotlin.math.max
+
+/**
+ * A counting semaphore for coroutines. It maintains a number of available permits.
+ * Each [acquire] suspends if necessary until a permit is available, and then takes it.
+ * Each [release] adds a permit, potentially releasing a suspended acquirer.
+ *
+ * Semaphore with `permits = 1` is essentially a [Mutex].
+ **/
+public interface Semaphore {
+    /**
+     * Returns the current number of permits available in this semaphore.
+     */
+    public val availablePermits: Int
+
+    /**
+     * Acquires a permit from this semaphore, suspending until one is available.
+     * All suspending acquirers are processed in first-in-first-out (FIFO) order.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
+     * throws [CancellationException] it means that the semaphore was not acquired.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
+     */
+    public suspend fun acquire()
+
+    /**
+     * Tries to acquire a permit from this semaphore without suspension.
+     *
+     * @return `true` if a permit was acquired, `false` otherwise.
+     */
+    public fun tryAcquire(): Boolean
+
+    /**
+     * Releases a permit, returning it into this semaphore. Resumes the first
+     * suspending acquirer if there is one at the point of invocation.
+     * Throws [IllegalStateException] if there is no acquired permit
+     * at the point of invocation.
+     */
+    public fun release()
+}
+
+/**
+ * Creates new [Semaphore] instance.
+ * @param permits the number of permits available in this semaphore.
+ * @param acquiredPermits the number of already acquired permits,
+ *        should be between `0` and `permits` (inclusively).
+ */
+@Suppress("FunctionName")
+public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
+
+/**
+ * Executes the given [action], acquiring a permit from this semaphore at the beginning
+ * and releasing it after the [action] is completed.
+ *
+ * @return the return value of the [action].
+ */
+public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
+    acquire()
+    try {
+        return action()
+    } finally {
+        release()
+    }
+}
+
+private class SemaphoreImpl(
+    private val permits: Int, acquiredPermits: Int
+) : Semaphore, SegmentQueue<SemaphoreSegment>() {
+    init {
+        require(permits > 0) { "Semaphore should have at least 1 permit" }
+        require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" }
+    }
+
+    override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
+
+    /**
+     * This counter indicates a number of available permits if it is non-negative,
+     * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
+     * since the maximal number of available permits is [permits] which is [Int],
+     * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
+     * real application.
+     */
+    private val _availablePermits = atomic(permits)
+    override val availablePermits: Int get() = max(_availablePermits.value, 0)
+
+    // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
+    // each segment contains a fixed number of slots. To determine a slot for each enqueue
+    // and dequeue operation, we increment the corresponding counter at the beginning of the operation
+    // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
+    // works with an individual cell.
+    private val enqIdx = atomic(0L)
+    private val deqIdx = atomic(0L)
+
+    override fun tryAcquire(): Boolean {
+        _availablePermits.loop { p ->
+            if (p <= 0) return false
+            if (_availablePermits.compareAndSet(p, p - 1)) return true
+        }
+    }
+
+    override suspend fun acquire() {
+        val p = _availablePermits.getAndDecrement()
+        if (p > 0) return // permit acquired
+        addToQueueAndSuspend()
+    }
+
+    override fun release() {
+        val p = _availablePermits.getAndUpdate { cur ->
+            check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
+            cur + 1
+        }
+        if (p >= 0) return // no waiters
+        resumeNextFromQueue()
+    }
+
+    private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
+        val last = this.tail
+        val enqIdx = enqIdx.getAndIncrement()
+        val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
+        val i = (enqIdx % SEGMENT_SIZE).toInt()
+        if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
+            // already resumed
+            cont.resume(Unit)
+            return@sc
+        }
+        cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun resumeNextFromQueue() {
+        val first = this.head
+        val deqIdx = deqIdx.getAndIncrement()
+        val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return
+        val i = (deqIdx % SEGMENT_SIZE).toInt()
+        val cont = segment.getAndUpdate(i) {
+            // Cancelled continuation invokes `release`
+            // and resumes next suspended acquirer if needed.
+            if (it === CANCELLED) return
+            RESUMED
+        }
+        if (cont === null) return // just resumed
+        (cont as CancellableContinuation<Unit>).resume(Unit)
+    }
+}
+
+private class CancelSemaphoreAcquisitionHandler(
+    private val semaphore: Semaphore,
+    private val segment: SemaphoreSegment,
+    private val index: Int
+) : CancelHandler() {
+    override fun invoke(cause: Throwable?) {
+        segment.cancel(index)
+        semaphore.release()
+    }
+
+    override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
+}
+
+private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
+    private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
+
+    @Suppress("NOTHING_TO_INLINE")
+    inline fun get(index: Int): Any? = acquirers[index].value
+
+    @Suppress("NOTHING_TO_INLINE")
+    inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
+
+    inline fun getAndUpdate(index: Int, function: (Any?) -> Any?): Any? {
+        while (true) {
+            val cur = acquirers[index].value
+            val upd = function(cur)
+            if (cas(index, cur, upd)) return cur
+        }
+    }
+
+    private val cancelledSlots = atomic(0)
+    override val removed get() = cancelledSlots.value == SEGMENT_SIZE
+
+    // Cleans the acquirer slot located by the specified index
+    // and removes this segment physically if all slots are cleaned.
+    fun cancel(index: Int) {
+        // Clean the specified waiter
+        acquirers[index].value = CANCELLED
+        // Remove this segment if needed
+        if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
+            remove()
+    }
+
+    override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
+}
+
+@SharedImmutable
+private val RESUMED = Symbol("RESUMED")
+@SharedImmutable
+private val CANCELLED = Symbol("CANCELLED")
+@SharedImmutable
+private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
new file mode 100644
index 0000000..a6aaf24
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
@@ -0,0 +1,119 @@
+package kotlinx.coroutines.sync
+
+import kotlinx.coroutines.TestBase
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class SemaphoreTest : TestBase() {
+
+    @Test
+    fun testSimple() = runTest {
+        val semaphore = Semaphore(2)
+        launch {
+            expect(3)
+            semaphore.release()
+            expect(4)
+        }
+        expect(1)
+        semaphore.acquire()
+        semaphore.acquire()
+        expect(2)
+        semaphore.acquire()
+        finish(5)
+    }
+
+    @Test
+    fun testSimpleAsMutex() = runTest {
+        val semaphore = Semaphore(1)
+        expect(1)
+        launch {
+            expect(4)
+            semaphore.acquire() // suspends
+            expect(7) // now got lock
+            semaphore.release()
+            expect(8)
+        }
+        expect(2)
+        semaphore.acquire() // locked
+        expect(3)
+        yield() // yield to child
+        expect(5)
+        semaphore.release()
+        expect(6)
+        yield() // now child has lock
+        finish(9)
+    }
+
+    @Test
+    fun tryAcquireTest() = runTest {
+        val semaphore = Semaphore(2)
+        assertTrue(semaphore.tryAcquire())
+        assertTrue(semaphore.tryAcquire())
+        assertFalse(semaphore.tryAcquire())
+        assertEquals(0, semaphore.availablePermits)
+        semaphore.release()
+        assertEquals(1, semaphore.availablePermits)
+        assertTrue(semaphore.tryAcquire())
+        assertEquals(0, semaphore.availablePermits)
+    }
+
+    @Test
+    fun withSemaphoreTest() = runTest {
+        val semaphore = Semaphore(1)
+        assertEquals(1, semaphore.availablePermits)
+        semaphore.withPermit {
+            assertEquals(0, semaphore.availablePermits)
+        }
+        assertEquals(1, semaphore.availablePermits)
+    }
+
+    @Test
+    fun fairnessTest() = runTest {
+        val semaphore = Semaphore(1)
+        semaphore.acquire()
+        launch(coroutineContext) {
+            // first to acquire
+            expect(2)
+            semaphore.acquire() // suspend
+            expect(6)
+        }
+        launch(coroutineContext) {
+            // second to acquire
+            expect(3)
+            semaphore.acquire() // suspend
+            expect(9)
+        }
+        expect(1)
+        yield()
+        expect(4)
+        semaphore.release()
+        expect(5)
+        yield()
+        expect(7)
+        semaphore.release()
+        expect(8)
+        yield()
+        finish(10)
+    }
+
+    @Test
+    fun testCancellationReleasesSemaphore() = runTest {
+        val semaphore = Semaphore(1)
+        semaphore.acquire()
+        assertEquals(0, semaphore.availablePermits)
+        val job = launch {
+            assertFalse(semaphore.tryAcquire())
+            semaphore.acquire()
+        }
+        yield()
+        job.cancelAndJoin()
+        assertEquals(0, semaphore.availablePermits)
+        semaphore.release()
+        assertEquals(1, semaphore.availablePermits)
+    }
+}
\ No newline at end of file