Add fast `Semaphore`.
In addition, the `SegmentQueue` data structure, which emulates an infinite array with fast removing from the middle, is introduced for storing suspended acquirers in semaphore/mutex/channel algorithms.
Fixes #1088
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
new file mode 100644
index 0000000..0ffb990
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -0,0 +1,213 @@
+package kotlinx.coroutines.sync
+
+import kotlinx.atomicfu.atomic
+import kotlinx.atomicfu.atomicArrayOfNulls
+import kotlinx.atomicfu.getAndUpdate
+import kotlinx.atomicfu.loop
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import kotlin.coroutines.resume
+import kotlin.math.max
+
+/**
+ * A counting semaphore for coroutines. It maintains a number of available permits.
+ * Each [acquire] suspends if necessary until a permit is available, and then takes it.
+ * Each [release] adds a permit, potentially releasing a suspended acquirer.
+ *
+ * Semaphore with `permits = 1` is essentially a [Mutex].
+ **/
+public interface Semaphore {
+ /**
+ * Returns the current number of permits available in this semaphore.
+ */
+ public val availablePermits: Int
+
+ /**
+ * Acquires a permit from this semaphore, suspending until one is available.
+ * All suspending acquirers are processed in first-in-first-out (FIFO) order.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function
+ * throws [CancellationException] it means that the semaphore was not acquired.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
+ */
+ public suspend fun acquire()
+
+ /**
+ * Tries to acquire a permit from this semaphore without suspension.
+ *
+ * @return `true` if a permit was acquired, `false` otherwise.
+ */
+ public fun tryAcquire(): Boolean
+
+ /**
+ * Releases a permit, returning it into this semaphore. Resumes the first
+ * suspending acquirer if there is one at the point of invocation.
+ * Throws [IllegalStateException] if there is no acquired permit
+ * at the point of invocation.
+ */
+ public fun release()
+}
+
+/**
+ * Creates new [Semaphore] instance.
+ * @param permits the number of permits available in this semaphore.
+ * @param acquiredPermits the number of already acquired permits,
+ * should be between `0` and `permits` (inclusively).
+ */
+@Suppress("FunctionName")
+public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
+
+/**
+ * Executes the given [action], acquiring a permit from this semaphore at the beginning
+ * and releasing it after the [action] is completed.
+ *
+ * @return the return value of the [action].
+ */
+public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
+ acquire()
+ try {
+ return action()
+ } finally {
+ release()
+ }
+}
+
+private class SemaphoreImpl(
+ private val permits: Int, acquiredPermits: Int
+) : Semaphore, SegmentQueue<SemaphoreSegment>() {
+ init {
+ require(permits > 0) { "Semaphore should have at least 1 permit" }
+ require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" }
+ }
+
+ override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
+
+ /**
+ * This counter indicates a number of available permits if it is non-negative,
+ * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
+ * since the maximal number of available permits is [permits] which is [Int],
+ * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
+ * real application.
+ */
+ private val _availablePermits = atomic(permits)
+ override val availablePermits: Int get() = max(_availablePermits.value, 0)
+
+ // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
+ // each segment contains a fixed number of slots. To determine a slot for each enqueue
+ // and dequeue operation, we increment the corresponding counter at the beginning of the operation
+ // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
+ // works with an individual cell.
+ private val enqIdx = atomic(0L)
+ private val deqIdx = atomic(0L)
+
+ override fun tryAcquire(): Boolean {
+ _availablePermits.loop { p ->
+ if (p <= 0) return false
+ if (_availablePermits.compareAndSet(p, p - 1)) return true
+ }
+ }
+
+ override suspend fun acquire() {
+ val p = _availablePermits.getAndDecrement()
+ if (p > 0) return // permit acquired
+ addToQueueAndSuspend()
+ }
+
+ override fun release() {
+ val p = _availablePermits.getAndUpdate { cur ->
+ check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
+ cur + 1
+ }
+ if (p >= 0) return // no waiters
+ resumeNextFromQueue()
+ }
+
+ private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
+ val last = this.tail
+ val enqIdx = enqIdx.getAndIncrement()
+ val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
+ val i = (enqIdx % SEGMENT_SIZE).toInt()
+ if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
+ // already resumed
+ cont.resume(Unit)
+ return@sc
+ }
+ cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun resumeNextFromQueue() {
+ val first = this.head
+ val deqIdx = deqIdx.getAndIncrement()
+ val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return
+ val i = (deqIdx % SEGMENT_SIZE).toInt()
+ val cont = segment.getAndUpdate(i) {
+ // Cancelled continuation invokes `release`
+ // and resumes next suspended acquirer if needed.
+ if (it === CANCELLED) return
+ RESUMED
+ }
+ if (cont === null) return // just resumed
+ (cont as CancellableContinuation<Unit>).resume(Unit)
+ }
+}
+
+private class CancelSemaphoreAcquisitionHandler(
+ private val semaphore: Semaphore,
+ private val segment: SemaphoreSegment,
+ private val index: Int
+) : CancelHandler() {
+ override fun invoke(cause: Throwable?) {
+ segment.cancel(index)
+ semaphore.release()
+ }
+
+ override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
+}
+
+private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
+ private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun get(index: Int): Any? = acquirers[index].value
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
+
+ inline fun getAndUpdate(index: Int, function: (Any?) -> Any?): Any? {
+ while (true) {
+ val cur = acquirers[index].value
+ val upd = function(cur)
+ if (cas(index, cur, upd)) return cur
+ }
+ }
+
+ private val cancelledSlots = atomic(0)
+ override val removed get() = cancelledSlots.value == SEGMENT_SIZE
+
+ // Cleans the acquirer slot located by the specified index
+ // and removes this segment physically if all slots are cleaned.
+ fun cancel(index: Int) {
+ // Clean the specified waiter
+ acquirers[index].value = CANCELLED
+ // Remove this segment if needed
+ if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
+ remove()
+ }
+
+ override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
+}
+
+@SharedImmutable
+private val RESUMED = Symbol("RESUMED")
+@SharedImmutable
+private val CANCELLED = Symbol("CANCELLED")
+@SharedImmutable
+private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
\ No newline at end of file