blob: d72f989b6167095089d2f9e49e790cd2345967d7 [file] [log] [blame]
Roman Elizarov60f86882019-12-17 19:14:52 +03001/*
2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
Nikita Koval253e8eb2019-06-04 22:39:51 +02005package kotlinx.coroutines.sync
6
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +03007import kotlinx.atomicfu.*
Nikita Koval253e8eb2019-06-04 22:39:51 +02008import kotlinx.coroutines.*
9import kotlinx.coroutines.internal.*
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030010import kotlin.coroutines.*
11import kotlin.jvm.*
12import kotlin.math.*
Roman Elizarov60f86882019-12-17 19:14:52 +030013import kotlin.native.concurrent.*
Nikita Koval253e8eb2019-06-04 22:39:51 +020014
15/**
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030016 * A counting semaphore for coroutines that logically maintains a number of available permits.
17 * Each [acquire] takes a single permit or suspends until it is available.
Nikita Koval253e8eb2019-06-04 22:39:51 +020018 * Each [release] adds a permit, potentially releasing a suspended acquirer.
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030019 * Semaphore is fair and maintains a FIFO order of acquirers.
Nikita Koval253e8eb2019-06-04 22:39:51 +020020 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030021 * Semaphores are mostly used to limit the number of coroutines that have an access to particular resource.
Nikita Koval253e8eb2019-06-04 22:39:51 +020022 * Semaphore with `permits = 1` is essentially a [Mutex].
23 **/
24public interface Semaphore {
25 /**
26 * Returns the current number of permits available in this semaphore.
27 */
28 public val availablePermits: Int
29
30 /**
31 * Acquires a permit from this semaphore, suspending until one is available.
32 * All suspending acquirers are processed in first-in-first-out (FIFO) order.
33 *
34 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
35 * function is suspended, this function immediately resumes with [CancellationException].
36 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030037 * *Cancellation of suspended semaphore acquisition is atomic* -- when this function
Nikita Koval253e8eb2019-06-04 22:39:51 +020038 * throws [CancellationException] it means that the semaphore was not acquired.
39 *
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030040 * Note, that this function does not check for cancellation when it does not suspend.
41 * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
42 * check for cancellation in tight loops if needed.
Nikita Koval253e8eb2019-06-04 22:39:51 +020043 *
44 * Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
45 */
46 public suspend fun acquire()
47
48 /**
49 * Tries to acquire a permit from this semaphore without suspension.
50 *
51 * @return `true` if a permit was acquired, `false` otherwise.
52 */
53 public fun tryAcquire(): Boolean
54
55 /**
56 * Releases a permit, returning it into this semaphore. Resumes the first
57 * suspending acquirer if there is one at the point of invocation.
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030058 * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
Nikita Koval253e8eb2019-06-04 22:39:51 +020059 */
60 public fun release()
61}
62
63/**
64 * Creates new [Semaphore] instance.
65 * @param permits the number of permits available in this semaphore.
66 * @param acquiredPermits the number of already acquired permits,
67 * should be between `0` and `permits` (inclusively).
68 */
69@Suppress("FunctionName")
70public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
71
72/**
73 * Executes the given [action], acquiring a permit from this semaphore at the beginning
74 * and releasing it after the [action] is completed.
75 *
76 * @return the return value of the [action].
77 */
78public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
79 acquire()
80 try {
81 return action()
82 } finally {
83 release()
84 }
85}
86
87private class SemaphoreImpl(
88 private val permits: Int, acquiredPermits: Int
89) : Semaphore, SegmentQueue<SemaphoreSegment>() {
90 init {
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +030091 require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
92 require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
Nikita Koval253e8eb2019-06-04 22:39:51 +020093 }
94
95 override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev)
96
97 /**
98 * This counter indicates a number of available permits if it is non-negative,
99 * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
100 * since the maximal number of available permits is [permits] which is [Int],
101 * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
102 * real application.
103 */
Turing Technologies (Wynne Plaga)420fed92019-08-07 23:02:59 -0400104 private val _availablePermits = atomic(permits - acquiredPermits)
Nikita Koval253e8eb2019-06-04 22:39:51 +0200105 override val availablePermits: Int get() = max(_availablePermits.value, 0)
106
107 // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
108 // each segment contains a fixed number of slots. To determine a slot for each enqueue
109 // and dequeue operation, we increment the corresponding counter at the beginning of the operation
110 // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
111 // works with an individual cell.
112 private val enqIdx = atomic(0L)
113 private val deqIdx = atomic(0L)
114
115 override fun tryAcquire(): Boolean {
116 _availablePermits.loop { p ->
117 if (p <= 0) return false
118 if (_availablePermits.compareAndSet(p, p - 1)) return true
119 }
120 }
121
122 override suspend fun acquire() {
123 val p = _availablePermits.getAndDecrement()
124 if (p > 0) return // permit acquired
125 addToQueueAndSuspend()
126 }
127
128 override fun release() {
Nikita Koval9077b012019-07-05 12:18:35 +0300129 val p = incPermits()
Nikita Koval253e8eb2019-06-04 22:39:51 +0200130 if (p >= 0) return // no waiters
131 resumeNextFromQueue()
132 }
133
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +0300134 fun incPermits() = _availablePermits.getAndUpdate { cur ->
135 check(cur < permits) { "The number of released permits cannot be greater than $permits" }
Nikita Koval9077b012019-07-05 12:18:35 +0300136 cur + 1
137 }
138
Vsevolod Tolstopyatov946e5782019-09-25 18:02:24 +0300139 private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
Nikita Koval253e8eb2019-06-04 22:39:51 +0200140 val last = this.tail
141 val enqIdx = enqIdx.getAndIncrement()
142 val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
143 val i = (enqIdx % SEGMENT_SIZE).toInt()
144 if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
145 // already resumed
146 cont.resume(Unit)
147 return@sc
148 }
149 cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
150 }
151
152 @Suppress("UNCHECKED_CAST")
Nikita Koval9077b012019-07-05 12:18:35 +0300153 internal fun resumeNextFromQueue() {
154 try_again@while (true) {
155 val first = this.head
156 val deqIdx = deqIdx.getAndIncrement()
157 val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again
158 val i = (deqIdx % SEGMENT_SIZE).toInt()
159 val cont = segment.getAndSet(i, RESUMED)
160 if (cont === null) return // just resumed
161 if (cont === CANCELLED) continue@try_again
162 (cont as CancellableContinuation<Unit>).resume(Unit)
163 return
Nikita Koval253e8eb2019-06-04 22:39:51 +0200164 }
Nikita Koval253e8eb2019-06-04 22:39:51 +0200165 }
166}
167
168private class CancelSemaphoreAcquisitionHandler(
Nikita Koval9077b012019-07-05 12:18:35 +0300169 private val semaphore: SemaphoreImpl,
Nikita Koval253e8eb2019-06-04 22:39:51 +0200170 private val segment: SemaphoreSegment,
171 private val index: Int
172) : CancelHandler() {
173 override fun invoke(cause: Throwable?) {
dkhalanskyjb9a62f272019-09-03 16:43:23 +0300174 val p = semaphore.incPermits()
175 if (p >= 0) return
Nikita Koval9077b012019-07-05 12:18:35 +0300176 if (segment.cancel(index)) return
177 semaphore.resumeNextFromQueue()
Nikita Koval253e8eb2019-06-04 22:39:51 +0200178 }
179
180 override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
181}
182
183private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
Nikita Koval9077b012019-07-05 12:18:35 +0300184 val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
Vsevolod Tolstopyatova3763e82019-08-06 16:26:27 +0300185 private val cancelledSlots = atomic(0)
186 override val removed get() = cancelledSlots.value == SEGMENT_SIZE
Nikita Koval253e8eb2019-06-04 22:39:51 +0200187
188 @Suppress("NOTHING_TO_INLINE")
189 inline fun get(index: Int): Any? = acquirers[index].value
190
191 @Suppress("NOTHING_TO_INLINE")
192 inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
193
Nikita Koval9077b012019-07-05 12:18:35 +0300194 @Suppress("NOTHING_TO_INLINE")
195 inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
Nikita Koval253e8eb2019-06-04 22:39:51 +0200196
Nikita Koval253e8eb2019-06-04 22:39:51 +0200197 // Cleans the acquirer slot located by the specified index
198 // and removes this segment physically if all slots are cleaned.
Nikita Koval9077b012019-07-05 12:18:35 +0300199 fun cancel(index: Int): Boolean {
200 // Try to cancel the slot
201 val cancelled = getAndSet(index, CANCELLED) !== RESUMED
Nikita Koval253e8eb2019-06-04 22:39:51 +0200202 // Remove this segment if needed
203 if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
204 remove()
Nikita Koval9077b012019-07-05 12:18:35 +0300205 return cancelled
Nikita Koval253e8eb2019-06-04 22:39:51 +0200206 }
207
208 override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
209}
210
211@SharedImmutable
212private val RESUMED = Symbol("RESUMED")
213@SharedImmutable
214private val CANCELLED = Symbol("CANCELLED")
215@SharedImmutable
216private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)