Roman Elizarov | 60f8688 | 2019-12-17 19:14:52 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | */ |
| 4 | |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 5 | package kotlinx.coroutines.sync |
| 6 | |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 7 | import kotlinx.atomicfu.* |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 8 | import kotlinx.coroutines.* |
| 9 | import kotlinx.coroutines.internal.* |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 10 | import kotlin.coroutines.* |
| 11 | import kotlin.jvm.* |
| 12 | import kotlin.math.* |
Roman Elizarov | 60f8688 | 2019-12-17 19:14:52 +0300 | [diff] [blame] | 13 | import kotlin.native.concurrent.* |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 14 | |
| 15 | /** |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 16 | * A counting semaphore for coroutines that logically maintains a number of available permits. |
| 17 | * Each [acquire] takes a single permit or suspends until it is available. |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 18 | * Each [release] adds a permit, potentially releasing a suspended acquirer. |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 19 | * Semaphore is fair and maintains a FIFO order of acquirers. |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 20 | * |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 21 | * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource. |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 22 | * Semaphore with `permits = 1` is essentially a [Mutex]. |
| 23 | **/ |
| 24 | public interface Semaphore { |
| 25 | /** |
| 26 | * Returns the current number of permits available in this semaphore. |
| 27 | */ |
| 28 | public val availablePermits: Int |
| 29 | |
| 30 | /** |
| 31 | * Acquires a permit from this semaphore, suspending until one is available. |
| 32 | * All suspending acquirers are processed in first-in-first-out (FIFO) order. |
| 33 | * |
| 34 | * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this |
| 35 | * function is suspended, this function immediately resumes with [CancellationException]. |
| 36 | * |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 37 | * *Cancellation of suspended semaphore acquisition is atomic* -- when this function |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 38 | * throws [CancellationException] it means that the semaphore was not acquired. |
| 39 | * |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 40 | * Note, that this function does not check for cancellation when it does not suspend. |
| 41 | * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically |
| 42 | * check for cancellation in tight loops if needed. |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 43 | * |
| 44 | * Use [tryAcquire] to try acquire a permit of this semaphore without suspension. |
| 45 | */ |
| 46 | public suspend fun acquire() |
| 47 | |
| 48 | /** |
| 49 | * Tries to acquire a permit from this semaphore without suspension. |
| 50 | * |
| 51 | * @return `true` if a permit was acquired, `false` otherwise. |
| 52 | */ |
| 53 | public fun tryAcquire(): Boolean |
| 54 | |
| 55 | /** |
| 56 | * Releases a permit, returning it into this semaphore. Resumes the first |
| 57 | * suspending acquirer if there is one at the point of invocation. |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 58 | * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 59 | */ |
| 60 | public fun release() |
| 61 | } |
| 62 | |
| 63 | /** |
| 64 | * Creates new [Semaphore] instance. |
| 65 | * @param permits the number of permits available in this semaphore. |
| 66 | * @param acquiredPermits the number of already acquired permits, |
| 67 | * should be between `0` and `permits` (inclusively). |
| 68 | */ |
| 69 | @Suppress("FunctionName") |
| 70 | public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits) |
| 71 | |
| 72 | /** |
| 73 | * Executes the given [action], acquiring a permit from this semaphore at the beginning |
| 74 | * and releasing it after the [action] is completed. |
| 75 | * |
| 76 | * @return the return value of the [action]. |
| 77 | */ |
| 78 | public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T { |
| 79 | acquire() |
| 80 | try { |
| 81 | return action() |
| 82 | } finally { |
| 83 | release() |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | private class SemaphoreImpl( |
| 88 | private val permits: Int, acquiredPermits: Int |
| 89 | ) : Semaphore, SegmentQueue<SemaphoreSegment>() { |
| 90 | init { |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 91 | require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } |
| 92 | require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 93 | } |
| 94 | |
| 95 | override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) |
| 96 | |
| 97 | /** |
| 98 | * This counter indicates a number of available permits if it is non-negative, |
| 99 | * or the size with minus sign otherwise. Note, that 32-bit counter is enough here |
| 100 | * since the maximal number of available permits is [permits] which is [Int], |
| 101 | * and the maximum number of waiting acquirers cannot be greater than 2^31 in any |
| 102 | * real application. |
| 103 | */ |
Turing Technologies (Wynne Plaga) | 420fed9 | 2019-08-07 23:02:59 -0400 | [diff] [blame] | 104 | private val _availablePermits = atomic(permits - acquiredPermits) |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 105 | override val availablePermits: Int get() = max(_availablePermits.value, 0) |
| 106 | |
| 107 | // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`; |
| 108 | // each segment contains a fixed number of slots. To determine a slot for each enqueue |
| 109 | // and dequeue operation, we increment the corresponding counter at the beginning of the operation |
| 110 | // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair |
| 111 | // works with an individual cell. |
| 112 | private val enqIdx = atomic(0L) |
| 113 | private val deqIdx = atomic(0L) |
| 114 | |
| 115 | override fun tryAcquire(): Boolean { |
| 116 | _availablePermits.loop { p -> |
| 117 | if (p <= 0) return false |
| 118 | if (_availablePermits.compareAndSet(p, p - 1)) return true |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | override suspend fun acquire() { |
| 123 | val p = _availablePermits.getAndDecrement() |
| 124 | if (p > 0) return // permit acquired |
| 125 | addToQueueAndSuspend() |
| 126 | } |
| 127 | |
| 128 | override fun release() { |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 129 | val p = incPermits() |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 130 | if (p >= 0) return // no waiters |
| 131 | resumeNextFromQueue() |
| 132 | } |
| 133 | |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 134 | fun incPermits() = _availablePermits.getAndUpdate { cur -> |
| 135 | check(cur < permits) { "The number of released permits cannot be greater than $permits" } |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 136 | cur + 1 |
| 137 | } |
| 138 | |
Vsevolod Tolstopyatov | 946e578 | 2019-09-25 18:02:24 +0300 | [diff] [blame] | 139 | private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont -> |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 140 | val last = this.tail |
| 141 | val enqIdx = enqIdx.getAndIncrement() |
| 142 | val segment = getSegment(last, enqIdx / SEGMENT_SIZE) |
| 143 | val i = (enqIdx % SEGMENT_SIZE).toInt() |
| 144 | if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { |
| 145 | // already resumed |
| 146 | cont.resume(Unit) |
| 147 | return@sc |
| 148 | } |
| 149 | cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler) |
| 150 | } |
| 151 | |
| 152 | @Suppress("UNCHECKED_CAST") |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 153 | internal fun resumeNextFromQueue() { |
| 154 | try_again@while (true) { |
| 155 | val first = this.head |
| 156 | val deqIdx = deqIdx.getAndIncrement() |
| 157 | val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again |
| 158 | val i = (deqIdx % SEGMENT_SIZE).toInt() |
| 159 | val cont = segment.getAndSet(i, RESUMED) |
| 160 | if (cont === null) return // just resumed |
| 161 | if (cont === CANCELLED) continue@try_again |
| 162 | (cont as CancellableContinuation<Unit>).resume(Unit) |
| 163 | return |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 164 | } |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 165 | } |
| 166 | } |
| 167 | |
| 168 | private class CancelSemaphoreAcquisitionHandler( |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 169 | private val semaphore: SemaphoreImpl, |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 170 | private val segment: SemaphoreSegment, |
| 171 | private val index: Int |
| 172 | ) : CancelHandler() { |
| 173 | override fun invoke(cause: Throwable?) { |
dkhalanskyjb | 9a62f27 | 2019-09-03 16:43:23 +0300 | [diff] [blame] | 174 | val p = semaphore.incPermits() |
| 175 | if (p >= 0) return |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 176 | if (segment.cancel(index)) return |
| 177 | semaphore.resumeNextFromQueue() |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 178 | } |
| 179 | |
| 180 | override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]" |
| 181 | } |
| 182 | |
| 183 | private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) { |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 184 | val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE) |
Vsevolod Tolstopyatov | a3763e8 | 2019-08-06 16:26:27 +0300 | [diff] [blame] | 185 | private val cancelledSlots = atomic(0) |
| 186 | override val removed get() = cancelledSlots.value == SEGMENT_SIZE |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 187 | |
| 188 | @Suppress("NOTHING_TO_INLINE") |
| 189 | inline fun get(index: Int): Any? = acquirers[index].value |
| 190 | |
| 191 | @Suppress("NOTHING_TO_INLINE") |
| 192 | inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) |
| 193 | |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 194 | @Suppress("NOTHING_TO_INLINE") |
| 195 | inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 196 | |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 197 | // Cleans the acquirer slot located by the specified index |
| 198 | // and removes this segment physically if all slots are cleaned. |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 199 | fun cancel(index: Int): Boolean { |
| 200 | // Try to cancel the slot |
| 201 | val cancelled = getAndSet(index, CANCELLED) !== RESUMED |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 202 | // Remove this segment if needed |
| 203 | if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE) |
| 204 | remove() |
Nikita Koval | 9077b01 | 2019-07-05 12:18:35 +0300 | [diff] [blame] | 205 | return cancelled |
Nikita Koval | 253e8eb | 2019-06-04 22:39:51 +0200 | [diff] [blame] | 206 | } |
| 207 | |
| 208 | override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" |
| 209 | } |
| 210 | |
| 211 | @SharedImmutable |
| 212 | private val RESUMED = Symbol("RESUMED") |
| 213 | @SharedImmutable |
| 214 | private val CANCELLED = Symbol("CANCELLED") |
| 215 | @SharedImmutable |
| 216 | private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) |