Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame^] | 1 | package kotlinx.coroutines.sync |
| 2 | |
| 3 | import kotlinx.atomicfu.atomic |
| 4 | import kotlinx.atomicfu.atomicArrayOfNulls |
| 5 | import kotlinx.atomicfu.getAndUpdate |
| 6 | import kotlinx.atomicfu.loop |
| 7 | import kotlinx.coroutines.* |
| 8 | import kotlinx.coroutines.internal.* |
| 9 | import kotlin.coroutines.resume |
| 10 | import kotlin.math.max |
| 11 | |
| 12 | /** |
| 13 | * A counting semaphore for coroutines. It maintains a number of available permits. |
| 14 | * Each [acquire] suspends if necessary until a permit is available, and then takes it. |
| 15 | * Each [release] adds a permit, potentially releasing a suspended acquirer. |
| 16 | * |
| 17 | * Semaphore with `permits = 1` is essentially a [Mutex]. |
| 18 | **/ |
| 19 | public interface Semaphore { |
| 20 | /** |
| 21 | * Returns the current number of permits available in this semaphore. |
| 22 | */ |
| 23 | public val availablePermits: Int |
| 24 | |
| 25 | /** |
| 26 | * Acquires a permit from this semaphore, suspending until one is available. |
| 27 | * All suspending acquirers are processed in first-in-first-out (FIFO) order. |
| 28 | * |
| 29 | * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this |
| 30 | * function is suspended, this function immediately resumes with [CancellationException]. |
| 31 | * |
| 32 | * *Cancellation of suspended semaphore acquisition` is atomic* -- when this function |
| 33 | * throws [CancellationException] it means that the semaphore was not acquired. |
| 34 | * |
| 35 | * Note, that this function does not check for cancellation when it is not suspended. |
| 36 | * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. |
| 37 | * |
| 38 | * Use [tryAcquire] to try acquire a permit of this semaphore without suspension. |
| 39 | */ |
| 40 | public suspend fun acquire() |
| 41 | |
| 42 | /** |
| 43 | * Tries to acquire a permit from this semaphore without suspension. |
| 44 | * |
| 45 | * @return `true` if a permit was acquired, `false` otherwise. |
| 46 | */ |
| 47 | public fun tryAcquire(): Boolean |
| 48 | |
| 49 | /** |
| 50 | * Releases a permit, returning it into this semaphore. Resumes the first |
| 51 | * suspending acquirer if there is one at the point of invocation. |
| 52 | * Throws [IllegalStateException] if there is no acquired permit |
| 53 | * at the point of invocation. |
| 54 | */ |
| 55 | public fun release() |
| 56 | } |
| 57 | |
| 58 | /** |
| 59 | * Creates new [Semaphore] instance. |
| 60 | * @param permits the number of permits available in this semaphore. |
| 61 | * @param acquiredPermits the number of already acquired permits, |
| 62 | * should be between `0` and `permits` (inclusively). |
| 63 | */ |
| 64 | @Suppress("FunctionName") |
| 65 | public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits) |
| 66 | |
| 67 | /** |
| 68 | * Executes the given [action], acquiring a permit from this semaphore at the beginning |
| 69 | * and releasing it after the [action] is completed. |
| 70 | * |
| 71 | * @return the return value of the [action]. |
| 72 | */ |
| 73 | public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T { |
| 74 | acquire() |
| 75 | try { |
| 76 | return action() |
| 77 | } finally { |
| 78 | release() |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | private class SemaphoreImpl( |
| 83 | private val permits: Int, acquiredPermits: Int |
| 84 | ) : Semaphore, SegmentQueue<SemaphoreSegment>() { |
| 85 | init { |
| 86 | require(permits > 0) { "Semaphore should have at least 1 permit" } |
| 87 | require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..permits" } |
| 88 | } |
| 89 | |
| 90 | override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) |
| 91 | |
| 92 | /** |
| 93 | * This counter indicates a number of available permits if it is non-negative, |
| 94 | * or the size with minus sign otherwise. Note, that 32-bit counter is enough here |
| 95 | * since the maximal number of available permits is [permits] which is [Int], |
| 96 | * and the maximum number of waiting acquirers cannot be greater than 2^31 in any |
| 97 | * real application. |
| 98 | */ |
| 99 | private val _availablePermits = atomic(permits) |
| 100 | override val availablePermits: Int get() = max(_availablePermits.value, 0) |
| 101 | |
| 102 | // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`; |
| 103 | // each segment contains a fixed number of slots. To determine a slot for each enqueue |
| 104 | // and dequeue operation, we increment the corresponding counter at the beginning of the operation |
| 105 | // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair |
| 106 | // works with an individual cell. |
| 107 | private val enqIdx = atomic(0L) |
| 108 | private val deqIdx = atomic(0L) |
| 109 | |
| 110 | override fun tryAcquire(): Boolean { |
| 111 | _availablePermits.loop { p -> |
| 112 | if (p <= 0) return false |
| 113 | if (_availablePermits.compareAndSet(p, p - 1)) return true |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | override suspend fun acquire() { |
| 118 | val p = _availablePermits.getAndDecrement() |
| 119 | if (p > 0) return // permit acquired |
| 120 | addToQueueAndSuspend() |
| 121 | } |
| 122 | |
| 123 | override fun release() { |
| 124 | val p = _availablePermits.getAndUpdate { cur -> |
| 125 | check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" } |
| 126 | cur + 1 |
| 127 | } |
| 128 | if (p >= 0) return // no waiters |
| 129 | resumeNextFromQueue() |
| 130 | } |
| 131 | |
| 132 | private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont -> |
| 133 | val last = this.tail |
| 134 | val enqIdx = enqIdx.getAndIncrement() |
| 135 | val segment = getSegment(last, enqIdx / SEGMENT_SIZE) |
| 136 | val i = (enqIdx % SEGMENT_SIZE).toInt() |
| 137 | if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { |
| 138 | // already resumed |
| 139 | cont.resume(Unit) |
| 140 | return@sc |
| 141 | } |
| 142 | cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler) |
| 143 | } |
| 144 | |
| 145 | @Suppress("UNCHECKED_CAST") |
| 146 | private fun resumeNextFromQueue() { |
| 147 | val first = this.head |
| 148 | val deqIdx = deqIdx.getAndIncrement() |
| 149 | val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return |
| 150 | val i = (deqIdx % SEGMENT_SIZE).toInt() |
| 151 | val cont = segment.getAndUpdate(i) { |
| 152 | // Cancelled continuation invokes `release` |
| 153 | // and resumes next suspended acquirer if needed. |
| 154 | if (it === CANCELLED) return |
| 155 | RESUMED |
| 156 | } |
| 157 | if (cont === null) return // just resumed |
| 158 | (cont as CancellableContinuation<Unit>).resume(Unit) |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | private class CancelSemaphoreAcquisitionHandler( |
| 163 | private val semaphore: Semaphore, |
| 164 | private val segment: SemaphoreSegment, |
| 165 | private val index: Int |
| 166 | ) : CancelHandler() { |
| 167 | override fun invoke(cause: Throwable?) { |
| 168 | segment.cancel(index) |
| 169 | semaphore.release() |
| 170 | } |
| 171 | |
| 172 | override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]" |
| 173 | } |
| 174 | |
| 175 | private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) { |
| 176 | private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE) |
| 177 | |
| 178 | @Suppress("NOTHING_TO_INLINE") |
| 179 | inline fun get(index: Int): Any? = acquirers[index].value |
| 180 | |
| 181 | @Suppress("NOTHING_TO_INLINE") |
| 182 | inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) |
| 183 | |
| 184 | inline fun getAndUpdate(index: Int, function: (Any?) -> Any?): Any? { |
| 185 | while (true) { |
| 186 | val cur = acquirers[index].value |
| 187 | val upd = function(cur) |
| 188 | if (cas(index, cur, upd)) return cur |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | private val cancelledSlots = atomic(0) |
| 193 | override val removed get() = cancelledSlots.value == SEGMENT_SIZE |
| 194 | |
| 195 | // Cleans the acquirer slot located by the specified index |
| 196 | // and removes this segment physically if all slots are cleaned. |
| 197 | fun cancel(index: Int) { |
| 198 | // Clean the specified waiter |
| 199 | acquirers[index].value = CANCELLED |
| 200 | // Remove this segment if needed |
| 201 | if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE) |
| 202 | remove() |
| 203 | } |
| 204 | |
| 205 | override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" |
| 206 | } |
| 207 | |
| 208 | @SharedImmutable |
| 209 | private val RESUMED = Symbol("RESUMED") |
| 210 | @SharedImmutable |
| 211 | private val CANCELLED = Symbol("CANCELLED") |
| 212 | @SharedImmutable |
| 213 | private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) |