blob: a9df15cf49172eaa28d7ca0f734a2b049ae761e5 [file] [log] [blame]
Nikita Koval253e8eb2019-06-04 22:39:51 +02001package kotlinx.coroutines.sync
2
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +03003import kotlinx.atomicfu.*
Nikita Koval253e8eb2019-06-04 22:39:51 +02004import kotlinx.coroutines.*
5import kotlinx.coroutines.internal.*
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +03006import kotlin.coroutines.*
7import kotlin.jvm.*
8import kotlin.math.*
Nikita Koval253e8eb2019-06-04 22:39:51 +02009
10/**
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030011 * A counting semaphore for coroutines that logically maintains a number of available permits.
12 * Each [acquire] takes a single permit or suspends until it is available.
Nikita Koval253e8eb2019-06-04 22:39:51 +020013 * Each [release] adds a permit, potentially releasing a suspended acquirer.
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030014 * Semaphore is fair and maintains a FIFO order of acquirers.
Nikita Koval253e8eb2019-06-04 22:39:51 +020015 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030016 * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource.
Nikita Koval253e8eb2019-06-04 22:39:51 +020017 * Semaphore with `permits = 1` is essentially a [Mutex].
18 **/
19public interface Semaphore {
20 /**
21 * Returns the current number of permits available in this semaphore.
22 */
23 public val availablePermits: Int
24
25 /**
26 * Acquires a permit from this semaphore, suspending until one is available.
27 * All suspending acquirers are processed in first-in-first-out (FIFO) order.
28 *
29 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
30 * function is suspended, this function immediately resumes with [CancellationException].
31 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030032 * *Cancellation of suspended semaphore acquisition is atomic* -- when this function
Nikita Koval253e8eb2019-06-04 22:39:51 +020033 * throws [CancellationException] it means that the semaphore was not acquired.
34 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030035 * Note, that this function does not check for cancellation when it does not suspend.
36 * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
37 * check for cancellation in tight loops if needed.
Nikita Koval253e8eb2019-06-04 22:39:51 +020038 *
39 * Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
40 */
41 public suspend fun acquire()
42
43 /**
44 * Tries to acquire a permit from this semaphore without suspension.
45 *
46 * @return `true` if a permit was acquired, `false` otherwise.
47 */
48 public fun tryAcquire(): Boolean
49
50 /**
51 * Releases a permit, returning it into this semaphore. Resumes the first
52 * suspending acquirer if there is one at the point of invocation.
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030053 * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
Nikita Koval253e8eb2019-06-04 22:39:51 +020054 */
55 public fun release()
56}
57
58/**
59 * Creates new [Semaphore] instance.
60 * @param permits the number of permits available in this semaphore.
61 * @param acquiredPermits the number of already acquired permits,
62 * should be between `0` and `permits` (inclusively).
63 */
64@Suppress("FunctionName")
65public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
66
67/**
68 * Executes the given [action], acquiring a permit from this semaphore at the beginning
69 * and releasing it after the [action] is completed.
70 *
71 * @return the return value of the [action].
72 */
73public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
74 acquire()
75 try {
76 return action()
77 } finally {
78 release()
79 }
80}
81
82private class SemaphoreImpl(
83 private val permits: Int, acquiredPermits: Int
84) : Semaphore, SegmentQueue<SemaphoreSegment>() {
85 init {
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030086 require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
87 require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
Nikita Koval253e8eb2019-06-04 22:39:51 +020088 }
89
90 override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
91
92 /**
93 * This counter indicates a number of available permits if it is non-negative,
94 * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
95 * since the maximal number of available permits is [permits] which is [Int],
96 * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
97 * real application.
98 */
Turing Technologies (Wynne Plaga)420fed92019-08-07 23:02:59 -040099 private val _availablePermits = atomic(permits - acquiredPermits)
Nikita Koval253e8eb2019-06-04 22:39:51 +0200100 override val availablePermits: Int get() = max(_availablePermits.value, 0)
101
102 // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
103 // each segment contains a fixed number of slots. To determine a slot for each enqueue
104 // and dequeue operation, we increment the corresponding counter at the beginning of the operation
105 // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
106 // works with an individual cell.
107 private val enqIdx = atomic(0L)
108 private val deqIdx = atomic(0L)
109
110 override fun tryAcquire(): Boolean {
111 _availablePermits.loop { p ->
112 if (p <= 0) return false
113 if (_availablePermits.compareAndSet(p, p - 1)) return true
114 }
115 }
116
117 override suspend fun acquire() {
118 val p = _availablePermits.getAndDecrement()
119 if (p > 0) return // permit acquired
120 addToQueueAndSuspend()
121 }
122
123 override fun release() {
Nikita Koval9077b012019-07-05 12:18:35 +0300124 val p = incPermits()
Nikita Koval253e8eb2019-06-04 22:39:51 +0200125 if (p >= 0) return // no waiters
126 resumeNextFromQueue()
127 }
128
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +0300129 fun incPermits() = _availablePermits.getAndUpdate { cur ->
130 check(cur < permits) { "The number of released permits cannot be greater than $permits" }
Nikita Koval9077b012019-07-05 12:18:35 +0300131 cur + 1
132 }
133
Nikita Koval253e8eb2019-06-04 22:39:51 +0200134 private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
135 val last = this.tail
136 val enqIdx = enqIdx.getAndIncrement()
137 val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
138 val i = (enqIdx % SEGMENT_SIZE).toInt()
139 if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
140 // already resumed
141 cont.resume(Unit)
142 return@sc
143 }
144 cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
145 }
146
147 @Suppress("UNCHECKED_CAST")
Nikita Koval9077b012019-07-05 12:18:35 +0300148 internal fun resumeNextFromQueue() {
149 try_again@while (true) {
150 val first = this.head
151 val deqIdx = deqIdx.getAndIncrement()
152 val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again
153 val i = (deqIdx % SEGMENT_SIZE).toInt()
154 val cont = segment.getAndSet(i, RESUMED)
155 if (cont === null) return // just resumed
156 if (cont === CANCELLED) continue@try_again
157 (cont as CancellableContinuation<Unit>).resume(Unit)
158 return
Nikita Koval253e8eb2019-06-04 22:39:51 +0200159 }
Nikita Koval253e8eb2019-06-04 22:39:51 +0200160 }
161}
162
163private class CancelSemaphoreAcquisitionHandler(
Nikita Koval9077b012019-07-05 12:18:35 +0300164 private val semaphore: SemaphoreImpl,
Nikita Koval253e8eb2019-06-04 22:39:51 +0200165 private val segment: SemaphoreSegment,
166 private val index: Int
167) : CancelHandler() {
168 override fun invoke(cause: Throwable?) {
dkhalanskyjb9a62f272019-09-03 16:43:23 +0300169 val p = semaphore.incPermits()
170 if (p >= 0) return
Nikita Koval9077b012019-07-05 12:18:35 +0300171 if (segment.cancel(index)) return
172 semaphore.resumeNextFromQueue()
Nikita Koval253e8eb2019-06-04 22:39:51 +0200173 }
174
175 override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
176}
177
178private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
Nikita Koval9077b012019-07-05 12:18:35 +0300179 val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +0300180 private val cancelledSlots = atomic(0)
181 override val removed get() = cancelledSlots.value == SEGMENT_SIZE
Nikita Koval253e8eb2019-06-04 22:39:51 +0200182
183 @Suppress("NOTHING_TO_INLINE")
184 inline fun get(index: Int): Any? = acquirers[index].value
185
186 @Suppress("NOTHING_TO_INLINE")
187 inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
188
Nikita Koval9077b012019-07-05 12:18:35 +0300189 @Suppress("NOTHING_TO_INLINE")
190 inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
Nikita Koval253e8eb2019-06-04 22:39:51 +0200191
Nikita Koval253e8eb2019-06-04 22:39:51 +0200192 // Cleans the acquirer slot located by the specified index
193 // and removes this segment physically if all slots are cleaned.
Nikita Koval9077b012019-07-05 12:18:35 +0300194 fun cancel(index: Int): Boolean {
195 // Try to cancel the slot
196 val cancelled = getAndSet(index, CANCELLED) !== RESUMED
Nikita Koval253e8eb2019-06-04 22:39:51 +0200197 // Remove this segment if needed
198 if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
199 remove()
Nikita Koval9077b012019-07-05 12:18:35 +0300200 return cancelled
Nikita Koval253e8eb2019-06-04 22:39:51 +0200201 }
202
203 override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
204}
205
206@SharedImmutable
207private val RESUMED = Symbol("RESUMED")
208@SharedImmutable
209private val CANCELLED = Symbol("CANCELLED")
210@SharedImmutable
211private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)