Add fast `Semaphore`.

In addition, the `SegmentQueue` data structure, which emulates an infinite array with fast removing from the middle, is introduced for storing suspended acquirers in semaphore/mutex/channel algorithms.

Fixes #1088
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
new file mode 100644
index 0000000..0ffb990
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -0,0 +1,213 @@
+package kotlinx.coroutines.sync
+
+import kotlinx.atomicfu.atomic
+import kotlinx.atomicfu.atomicArrayOfNulls
+import kotlinx.atomicfu.getAndUpdate
+import kotlinx.atomicfu.loop
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import kotlin.coroutines.resume
+import kotlin.math.max
+
+/**
+ * A counting semaphore for coroutines. It maintains a number of available permits.
+ * Each [acquire] suspends if necessary until a permit is available, and then takes it.
+ * Each [release] adds a permit, potentially releasing a suspended acquirer.
+ *
+ * Semaphore with `permits = 1` is essentially a [Mutex].
+ **/
+public interface Semaphore {
+    /**
+     * Returns the current number of permits available in this semaphore.
+     */
+    public val availablePermits: Int
+
+    /**
+     * Acquires a permit from this semaphore, suspending until one is available.
+     * All suspending acquirers are processed in first-in-first-out (FIFO) order.
+     *
+     * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+     * function is suspended, this function immediately resumes with [CancellationException].
+     *
+     * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
+     * throws [CancellationException] it means that the semaphore was not acquired.
+     *
+     * Note, that this function does not check for cancellation when it is not suspended.
+     * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+     *
+     * Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
+     */
+    public suspend fun acquire()
+
+    /**
+     * Tries to acquire a permit from this semaphore without suspension.
+     *
+     * @return `true` if a permit was acquired, `false` otherwise.
+     */
+    public fun tryAcquire(): Boolean
+
+    /**
+     * Releases a permit, returning it into this semaphore. Resumes the first
+     * suspending acquirer if there is one at the point of invocation.
+     * Throws [IllegalStateException] if there is no acquired permit
+     * at the point of invocation.
+     */
+    public fun release()
+}
+
+/**
+ * Creates new [Semaphore] instance.
+ * @param permits the number of permits available in this semaphore.
+ * @param acquiredPermits the number of already acquired permits,
+ *        should be between `0` and `permits` (inclusively).
+ */
+@Suppress("FunctionName")
+public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
+
+/**
+ * Executes the given [action], acquiring a permit from this semaphore at the beginning
+ * and releasing it after the [action] is completed.
+ *
+ * @return the return value of the [action].
+ */
+public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
+    acquire()
+    try {
+        return action()
+    } finally {
+        release()
+    }
+}
+
+private class SemaphoreImpl(
+    private val permits: Int, acquiredPermits: Int
+) : Semaphore, SegmentQueue<SemaphoreSegment>() {
+    init {
+        require(permits > 0) { "Semaphore should have at least 1 permit" }
+        require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" }
+    }
+
+    override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
+
+    /**
+     * This counter indicates a number of available permits if it is non-negative,
+     * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
+     * since the maximal number of available permits is [permits] which is [Int],
+     * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
+     * real application.
+     */
+    private val _availablePermits = atomic(permits)
+    override val availablePermits: Int get() = max(_availablePermits.value, 0)
+
+    // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
+    // each segment contains a fixed number of slots. To determine a slot for each enqueue
+    // and dequeue operation, we increment the corresponding counter at the beginning of the operation
+    // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
+    // works with an individual cell.
+    private val enqIdx = atomic(0L)
+    private val deqIdx = atomic(0L)
+
+    override fun tryAcquire(): Boolean {
+        _availablePermits.loop { p ->
+            if (p <= 0) return false
+            if (_availablePermits.compareAndSet(p, p - 1)) return true
+        }
+    }
+
+    override suspend fun acquire() {
+        val p = _availablePermits.getAndDecrement()
+        if (p > 0) return // permit acquired
+        addToQueueAndSuspend()
+    }
+
+    override fun release() {
+        val p = _availablePermits.getAndUpdate { cur ->
+            check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
+            cur + 1
+        }
+        if (p >= 0) return // no waiters
+        resumeNextFromQueue()
+    }
+
+    private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
+        val last = this.tail
+        val enqIdx = enqIdx.getAndIncrement()
+        val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
+        val i = (enqIdx % SEGMENT_SIZE).toInt()
+        if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
+            // already resumed
+            cont.resume(Unit)
+            return@sc
+        }
+        cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
+    }
+
+    @Suppress("UNCHECKED_CAST")
+    private fun resumeNextFromQueue() {
+        val first = this.head
+        val deqIdx = deqIdx.getAndIncrement()
+        val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return
+        val i = (deqIdx % SEGMENT_SIZE).toInt()
+        val cont = segment.getAndUpdate(i) {
+            // Cancelled continuation invokes `release`
+            // and resumes next suspended acquirer if needed.
+            if (it === CANCELLED) return
+            RESUMED
+        }
+        if (cont === null) return // just resumed
+        (cont as CancellableContinuation<Unit>).resume(Unit)
+    }
+}
+
+private class CancelSemaphoreAcquisitionHandler(
+    private val semaphore: Semaphore,
+    private val segment: SemaphoreSegment,
+    private val index: Int
+) : CancelHandler() {
+    override fun invoke(cause: Throwable?) {
+        segment.cancel(index)
+        semaphore.release()
+    }
+
+    override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
+}
+
+private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
+    private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
+
+    @Suppress("NOTHING_TO_INLINE")
+    inline fun get(index: Int): Any? = acquirers[index].value
+
+    @Suppress("NOTHING_TO_INLINE")
+    inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
+
+    inline fun getAndUpdate(index: Int, function: (Any?) -> Any?): Any? {
+        while (true) {
+            val cur = acquirers[index].value
+            val upd = function(cur)
+            if (cas(index, cur, upd)) return cur
+        }
+    }
+
+    private val cancelledSlots = atomic(0)
+    override val removed get() = cancelledSlots.value == SEGMENT_SIZE
+
+    // Cleans the acquirer slot located by the specified index
+    // and removes this segment physically if all slots are cleaned.
+    fun cancel(index: Int) {
+        // Clean the specified waiter
+        acquirers[index].value = CANCELLED
+        // Remove this segment if needed
+        if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
+            remove()
+    }
+
+    override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
+}
+
+@SharedImmutable
+private val RESUMED = Symbol("RESUMED")
+@SharedImmutable
+private val CANCELLED = Symbol("CANCELLED")
+@SharedImmutable
+private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
\ No newline at end of file