blob: 6b90806bea9b5ced02a49b480775af9157b5d929 [file] [log] [blame]
Roman Elizarov8bd52542017-02-14 15:51:58 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarovf8fc4782017-02-22 10:35:08 +030017package kotlinx.coroutines.experimental.sync
Roman Elizarov8bd52542017-02-14 15:51:58 +030018
Roman Elizarovaa461cf2018-04-11 13:20:29 +030019import kotlinx.atomicfu.*
Roman Elizarovf8fc4782017-02-22 10:35:08 +030020import kotlinx.coroutines.experimental.*
Roman Elizarov174c6962017-02-28 17:36:51 +030021import kotlinx.coroutines.experimental.internal.*
Roman Elizarovaa461cf2018-04-11 13:20:29 +030022import kotlinx.coroutines.experimental.intrinsics.*
23import kotlinx.coroutines.experimental.selects.*
24import kotlin.coroutines.experimental.*
25import kotlinx.coroutines.experimental.internalAnnotations.*
Roman Elizarov8bd52542017-02-14 15:51:58 +030026
27/**
28 * Mutual exclusion for coroutines.
29 *
30 * Mutex has two states: _locked_ and _unlocked_.
31 * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
32 * the lock still suspends the invoker.
Vsevolod Tolstopyatova518edc2018-05-11 13:54:48 +030033 *
34 * JVM API note:
35 * Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
36 * An unlock on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
37 * Unsuccessful call to [tryLock] do not have any memory effects.
Roman Elizarov8bd52542017-02-14 15:51:58 +030038 */
Roman Elizarovf8fc4782017-02-22 10:35:08 +030039public interface Mutex {
40 /**
Roman Elizarovf8fc4782017-02-22 10:35:08 +030041 * Returns `true` when this mutex is locked.
42 */
43 public val isLocked: Boolean
44
45 /**
46 * Tries to lock this mutex, returning `false` if this mutex is already locked.
Roman Elizarov174c6962017-02-28 17:36:51 +030047 *
48 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
49 * is already locked with the same token (same identity), this function throws [IllegalStateException].
Roman Elizarovf8fc4782017-02-22 10:35:08 +030050 */
Roman Elizarov174c6962017-02-28 17:36:51 +030051 public fun tryLock(owner: Any? = null): Boolean
Roman Elizarovf8fc4782017-02-22 10:35:08 +030052
53 /**
54 * Locks this mutex, suspending caller while the mutex is locked.
55 *
Roman Elizarovd82b3a92017-06-23 21:52:08 +030056 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Roman Elizarovf8fc4782017-02-22 10:35:08 +030057 * function is suspended, this function immediately resumes with [CancellationException].
Roman Elizarova74eb5f2017-05-11 20:15:18 +030058 *
59 * *Cancellation of suspended lock invocation is atomic* -- when this function
Roman Elizarovf8fc4782017-02-22 10:35:08 +030060 * throws [CancellationException] it means that the mutex was not locked.
Roman Elizarova74eb5f2017-05-11 20:15:18 +030061 * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
62 * continue to execute even after it was cancelled from the same thread in the case when this lock operation
63 * was already resumed and the continuation was posted for execution to the thread's queue.
Roman Elizarovf8fc4782017-02-22 10:35:08 +030064 *
65 * Note, that this function does not check for cancellation when it is not suspended.
66 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
Roman Elizarov174c6962017-02-28 17:36:51 +030067 *
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030068 * This function can be used in [select] invocation with [onLock] clause.
Roman Elizarov174c6962017-02-28 17:36:51 +030069 * Use [tryLock] to try acquire lock without waiting.
70 *
71 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
72 * is already locked with the same token (same identity), this function throws [IllegalStateException].
Roman Elizarovf8fc4782017-02-22 10:35:08 +030073 */
Roman Elizarov174c6962017-02-28 17:36:51 +030074 public suspend fun lock(owner: Any? = null)
75
76 /**
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030077 * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
78 * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
79 * the reference to this mutex is passed into the corresponding block.
Roman Elizarov174c6962017-02-28 17:36:51 +030080 */
Roman Elizarovdb0e22d2017-08-29 18:15:57 +030081 public val onLock: SelectClause2<Any?, Mutex>
Roman Elizarovf8fc4782017-02-22 10:35:08 +030082
83 /**
Francesco Vasco14328d12017-07-26 15:31:15 +020084 * Checks mutex locked by owner
85 *
86 * @return `true` on mutex lock by owner, `false` if not locker or it is locked by different owner
87 */
88 public fun holdsLock(owner: Any): Boolean
89
90 /**
Roman Elizarovf8fc4782017-02-22 10:35:08 +030091 * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked.
Roman Elizarov174c6962017-02-28 17:36:51 +030092 *
93 * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
94 * was locked with the different token (by identity), this function throws [IllegalStateException].
Roman Elizarovf8fc4782017-02-22 10:35:08 +030095 */
Roman Elizarov174c6962017-02-28 17:36:51 +030096 public fun unlock(owner: Any? = null)
Roman Elizarovf8fc4782017-02-22 10:35:08 +030097}
98
Francesco Vascoe73899d2017-03-14 22:30:13 +010099/**
Roman Elizarovfe64d7b2017-07-21 18:42:33 +0300100 * Creates new [Mutex] instance.
Francesco Vascof1a24b12017-07-31 09:13:30 +0200101 * The mutex created is fair: lock is granted in first come, first served order.
102 *
Roman Elizarovfe64d7b2017-07-21 18:42:33 +0300103 * @param locked initial state of the mutex.
104 */
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300105@Suppress("FunctionName")
106public fun Mutex(locked: Boolean = false): Mutex =
107 MutexImpl(locked)
Roman Elizarovfe64d7b2017-07-21 18:42:33 +0300108
109/**
Roman Elizarovc02ee112017-04-19 12:57:38 +0300110 * Executes the given [action] under this mutex's lock.
Francesco Vasco3a78aa22017-07-26 15:06:18 +0200111 *
112 * @param owner Optional owner token for debugging.
113 *
Francesco Vascoe73899d2017-03-14 22:30:13 +0100114 * @return the return value of the action.
115 */
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300116public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
Francesco Vasco3a78aa22017-07-26 15:06:18 +0200117 lock(owner)
Roman Elizarovc02ee112017-04-19 12:57:38 +0300118 try {
119 return action()
120 } finally {
Francesco Vasco3a78aa22017-07-26 15:06:18 +0200121 unlock(owner)
Roman Elizarovc02ee112017-04-19 12:57:38 +0300122 }
123}
124
125/**
Roman Elizarov537c3592017-08-16 19:04:31 +0300126 * @suppress: **Deprecated**: binary compatibility with old code
127 */
128@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
129public suspend fun <T> Mutex.withLock(owner: Any? = null, action: suspend () -> T): T =
130 withLock(owner) { action() }
131
132/**
Roman Elizarovc02ee112017-04-19 12:57:38 +0300133 * @suppress: **Deprecated**: Use [withLock]
134 */
Francesco Vasco3a78aa22017-07-26 15:06:18 +0200135@Deprecated("Use `withLock(owner, action)", level = DeprecationLevel.HIDDEN)
Roman Elizarov537c3592017-08-16 19:04:31 +0300136public suspend fun <T> Mutex.withLock(action: suspend () -> T): T =
137 withLock { action() }
Francesco Vasco3a78aa22017-07-26 15:06:18 +0200138
139/**
140 * @suppress: **Deprecated**: Use [withLock]
141 */
Roman Elizarovc02ee112017-04-19 12:57:38 +0300142@Deprecated("Use `withLock`", replaceWith = ReplaceWith("withLock(action)"))
Roman Elizarov537c3592017-08-16 19:04:31 +0300143public suspend fun <T> Mutex.withMutex(action: suspend () -> T): T =
144 withLock { action() }
Francesco Vascoe73899d2017-03-14 22:30:13 +0100145
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300146private val LOCK_FAIL = Symbol("LOCK_FAIL")
147private val ENQUEUE_FAIL = Symbol("ENQUEUE_FAIL")
148private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL")
149private val SELECT_SUCCESS = Symbol("SELECT_SUCCESS")
150private val LOCKED = Symbol("LOCKED")
151private val UNLOCKED = Symbol("UNLOCKED")
152private val RESUME_QUIESCENT = Symbol("RESUME_QUIESCENT")
153private val RESUME_ACTIVE = Symbol("RESUME_ACTIVE")
154
155private val EmptyLocked = Empty(LOCKED)
156private val EmptyUnlocked = Empty(UNLOCKED)
157
158private class Empty(
159 @JvmField val locked: Any
160) {
161 override fun toString(): String = "Empty[$locked]"
162}
163
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300164internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
Roman Elizarov174c6962017-02-28 17:36:51 +0300165 // State is: Empty | LockedQueue | OpDescriptor
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300166 // shared objects while we have no waiters
167 private val _state = atomic<Any?>(if (locked) EmptyLocked else EmptyUnlocked)
Roman Elizarov8bd52542017-02-14 15:51:58 +0300168
Roman Elizarov11c140a2017-07-21 21:12:55 +0300169 // resumeNext is: RESUME_QUIESCENT | RESUME_ACTIVE | ResumeReq
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300170 private val _resumeNext = atomic<Any>(RESUME_QUIESCENT)
Roman Elizarov8bd52542017-02-14 15:51:58 +0300171
Roman Elizarov174c6962017-02-28 17:36:51 +0300172 public override val isLocked: Boolean get() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300173 _state.loop { state ->
Roman Elizarov8bd52542017-02-14 15:51:58 +0300174 when (state) {
Roman Elizarov174c6962017-02-28 17:36:51 +0300175 is Empty -> return state.locked !== UNLOCKED
176 is LockedQueue -> return true
177 is OpDescriptor -> state.perform(this) // help
178 else -> error("Illegal state $state")
Roman Elizarov8bd52542017-02-14 15:51:58 +0300179 }
180 }
181 }
182
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300183 // for tests ONLY
Roman Elizarov174c6962017-02-28 17:36:51 +0300184 internal val isLockedEmptyQueueState: Boolean get() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300185 val state = _state.value
Roman Elizarov174c6962017-02-28 17:36:51 +0300186 return state is LockedQueue && state.isEmpty
Roman Elizarov8bd52542017-02-14 15:51:58 +0300187 }
188
Roman Elizarov174c6962017-02-28 17:36:51 +0300189 public override fun tryLock(owner: Any?): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300190 _state.loop { state ->
Roman Elizarov8bd52542017-02-14 15:51:58 +0300191 when (state) {
192 is Empty -> {
Roman Elizarov174c6962017-02-28 17:36:51 +0300193 if (state.locked !== UNLOCKED) return false
Roman Elizarovaa461cf2018-04-11 13:20:29 +0300194 val update = if (owner == null) EmptyLocked else Empty(
195 owner
196 )
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300197 if (_state.compareAndSet(state, update)) return true
Roman Elizarov174c6962017-02-28 17:36:51 +0300198 }
199 is LockedQueue -> {
200 check(state.owner !== owner) { "Already locked by $owner" }
201 return false
202 }
203 is OpDescriptor -> state.perform(this) // help
204 else -> error("Illegal state $state")
205 }
206 }
207 }
208
209 public override suspend fun lock(owner: Any?) {
210 // fast-path -- try lock
211 if (tryLock(owner)) return
212 // slow-path -- suspend
213 return lockSuspend(owner)
214 }
215
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300216 private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
Roman Elizarov174c6962017-02-28 17:36:51 +0300217 val waiter = LockCont(owner, cont)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300218 _state.loop { state ->
Roman Elizarov174c6962017-02-28 17:36:51 +0300219 when (state) {
220 is Empty -> {
221 if (state.locked !== UNLOCKED) { // try upgrade to queue & retry
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300222 _state.compareAndSet(state, LockedQueue(state.locked))
Roman Elizarov8bd52542017-02-14 15:51:58 +0300223 } else {
224 // try lock
Roman Elizarov174c6962017-02-28 17:36:51 +0300225 val update = if (owner == null) EmptyLocked else Empty(owner)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300226 if (_state.compareAndSet(state, update)) { // locked
Roman Elizarov8bd52542017-02-14 15:51:58 +0300227 cont.resume(Unit)
228 return@sc
229 }
230 }
231 }
Roman Elizarov174c6962017-02-28 17:36:51 +0300232 is LockedQueue -> {
233 val curOwner = state.owner
234 check(curOwner !== owner) { "Already locked by $owner" }
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300235 if (state.addLastIf(waiter, { _state.value === state })) {
Roman Elizarov8bd52542017-02-14 15:51:58 +0300236 // added to waiter list!
Roman Elizarova74eb5f2017-05-11 20:15:18 +0300237 cont.initCancellability() // make it properly cancellable
Vsevolod Tolstopyatov80a29472018-04-17 16:02:02 +0300238 cont.removeOnCancellation(waiter)
Roman Elizarov8bd52542017-02-14 15:51:58 +0300239 return@sc
240 }
241 }
Roman Elizarov174c6962017-02-28 17:36:51 +0300242 is OpDescriptor -> state.perform(this) // help
243 else -> error("Illegal state $state")
Roman Elizarov8bd52542017-02-14 15:51:58 +0300244 }
245 }
246 }
247
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300248 override val onLock: SelectClause2<Any?, Mutex>
249 get() = this
250
251 // registerSelectLock
252 @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
253 override fun <R> registerSelectClause2(select: SelectInstance<R>, owner: Any?, block: suspend (Mutex) -> R) {
Roman Elizarov8bd52542017-02-14 15:51:58 +0300254 while (true) { // lock-free loop on state
Roman Elizarov174c6962017-02-28 17:36:51 +0300255 if (select.isSelected) return
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300256 val state = _state.value
Roman Elizarov8bd52542017-02-14 15:51:58 +0300257 when (state) {
258 is Empty -> {
Roman Elizarov174c6962017-02-28 17:36:51 +0300259 if (state.locked !== UNLOCKED) { // try upgrade to queue & retry
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300260 _state.compareAndSet(state, LockedQueue(state.locked))
Roman Elizarov174c6962017-02-28 17:36:51 +0300261 } else {
262 // try lock
263 val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
264 when {
265 failure == null -> { // success
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300266 block.startCoroutineUndispatched(receiver = this, completion = select.completion)
Roman Elizarov174c6962017-02-28 17:36:51 +0300267 return
268 }
269 failure === ALREADY_SELECTED -> return // already selected -- bail out
270 failure === LOCK_FAIL -> {} // retry
271 else -> error("performAtomicTrySelect(TryLockDesc) returned $failure")
272 }
273 }
274 }
275 is LockedQueue -> {
276 check(state.owner !== owner) { "Already locked by $owner" }
277 val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
278 val failure = select.performAtomicIfNotSelected(enqueueOp)
279 when {
280 failure == null -> { // successfully enqueued
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300281 select.disposeOnSelect(enqueueOp.node)
Roman Elizarov174c6962017-02-28 17:36:51 +0300282 return
283 }
284 failure === ALREADY_SELECTED -> return // already selected -- bail out
285 failure === ENQUEUE_FAIL -> {} // retry
286 else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
287 }
288 }
289 is OpDescriptor -> state.perform(this) // help
290 else -> error("Illegal state $state")
291 }
292 }
293 }
294
295 private class TryLockDesc(
296 @JvmField val mutex: MutexImpl,
297 @JvmField val owner: Any?
298 ) : AtomicDesc() {
299 // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300300 private inner class PrepareOp(private val op: AtomicOp<*>) : OpDescriptor() {
Roman Elizarov174c6962017-02-28 17:36:51 +0300301 override fun perform(affected: Any?): Any? {
302 val update: Any = if (op.isDecided) EmptyUnlocked else op // restore if was already decided
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300303 (affected as MutexImpl)._state.compareAndSet(this, update)
Roman Elizarov174c6962017-02-28 17:36:51 +0300304 return null // ok
305 }
306 }
307
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300308 override fun prepare(op: AtomicOp<*>): Any? {
Roman Elizarov174c6962017-02-28 17:36:51 +0300309 val prepare = PrepareOp(op)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300310 if (!mutex._state.compareAndSet(EmptyUnlocked, prepare)) return LOCK_FAIL
Roman Elizarov174c6962017-02-28 17:36:51 +0300311 return prepare.perform(mutex)
312 }
313
Roman Elizarovd82b3a92017-06-23 21:52:08 +0300314 override fun complete(op: AtomicOp<*>, failure: Any?) {
Roman Elizarov174c6962017-02-28 17:36:51 +0300315 val update = if (failure != null) EmptyUnlocked else {
316 if (owner == null) EmptyLocked else Empty(owner)
317 }
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300318 mutex._state.compareAndSet(op, update)
Roman Elizarov174c6962017-02-28 17:36:51 +0300319 }
320 }
321
322 private class TryEnqueueLockDesc<R>(
323 @JvmField val mutex: MutexImpl,
324 owner: Any?,
325 queue: LockedQueue,
326 select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300327 block: suspend (Mutex) -> R
328 ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
Roman Elizarov174c6962017-02-28 17:36:51 +0300329 override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300330 if (mutex._state.value !== queue) return ENQUEUE_FAIL
Roman Elizarov174c6962017-02-28 17:36:51 +0300331 return super.onPrepare(affected, next)
332 }
333 }
334
Francesco Vasco14328d12017-07-26 15:31:15 +0200335 public override fun holdsLock(owner: Any) =
336 _state.value.let { state ->
337 when (state) {
338 is Empty -> state.locked === owner
339 is LockedQueue -> state.owner === owner
340 else -> false
341 }
342 }
343
Roman Elizarov174c6962017-02-28 17:36:51 +0300344 public override fun unlock(owner: Any?) {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300345 _state.loop { state ->
Roman Elizarov174c6962017-02-28 17:36:51 +0300346 when (state) {
347 is Empty -> {
348 if (owner == null)
349 check(state.locked !== UNLOCKED) { "Mutex is not locked" }
350 else
351 check(state.locked === owner) { "Mutex is locked by ${state.locked} but expected $owner" }
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300352 if (_state.compareAndSet(state, EmptyUnlocked)) return
Roman Elizarov8bd52542017-02-14 15:51:58 +0300353 }
Roman Elizarov174c6962017-02-28 17:36:51 +0300354 is OpDescriptor -> state.perform(this)
355 is LockedQueue -> {
356 if (owner != null)
357 check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" }
Roman Elizarov8bd52542017-02-14 15:51:58 +0300358 val waiter = state.removeFirstOrNull()
359 if (waiter == null) {
360 val op = UnlockOp(state)
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300361 if (_state.compareAndSet(state, op) && op.perform(this) == null) return
Roman Elizarov8bd52542017-02-14 15:51:58 +0300362 } else {
Roman Elizarov174c6962017-02-28 17:36:51 +0300363 val token = (waiter as LockWaiter).tryResumeLockWaiter()
Roman Elizarov11c140a2017-07-21 21:12:55 +0300364 if (token != null) {
365 // successfully resumed waiter that now is holding the lock
366 // we must immediately transfer ownership to the next waiter, because this coroutine
367 // might try to lock it again after unlock returns do to StackOverflow avoidance code
368 // and its attempts to take a lock must be queued.
Roman Elizarov174c6962017-02-28 17:36:51 +0300369 state.owner = waiter.owner ?: LOCKED
Roman Elizarov11c140a2017-07-21 21:12:55 +0300370 // StackOverflow avoidance code
371 if (startResumeNext(waiter, token)) {
372 waiter.completeResumeLockWaiter(token)
373 finishResumeNext()
374 }
Roman Elizarov8bd52542017-02-14 15:51:58 +0300375 return
376 }
377 }
378 }
Roman Elizarov174c6962017-02-28 17:36:51 +0300379 else -> error("Illegal state $state")
Roman Elizarov8bd52542017-02-14 15:51:58 +0300380 }
381 }
382 }
383
Roman Elizarov11c140a2017-07-21 21:12:55 +0300384 private class ResumeReq(
385 @JvmField val waiter: LockWaiter,
386 @JvmField val token: Any
387 )
388
389 private fun startResumeNext(waiter: LockWaiter, token: Any): Boolean {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300390 _resumeNext.loop { resumeNext ->
Roman Elizarov11c140a2017-07-21 21:12:55 +0300391 when {
392 resumeNext === RESUME_QUIESCENT -> {
393 // this is never concurrent, because only one thread is holding mutex and trying to resume
394 // next waiter, so no need to CAS here
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300395 _resumeNext.value = RESUME_ACTIVE
Roman Elizarov11c140a2017-07-21 21:12:55 +0300396 return true
397 }
398 resumeNext === RESUME_ACTIVE ->
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300399 if (_resumeNext.compareAndSet(resumeNext, ResumeReq(waiter, token))) return false
Roman Elizarov11c140a2017-07-21 21:12:55 +0300400 else -> error("Cannot happen")
401 }
402 }
403 }
404
405 private fun finishResumeNext() {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300406 // also a resumption loop to fulfill requests of inner resume invokes
407 _resumeNext.loop { resumeNext ->
Roman Elizarov11c140a2017-07-21 21:12:55 +0300408 when {
409 resumeNext === RESUME_ACTIVE ->
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300410 if (_resumeNext.compareAndSet(resumeNext, RESUME_QUIESCENT)) return
Roman Elizarov11c140a2017-07-21 21:12:55 +0300411 resumeNext is ResumeReq -> {
412 // this is never concurrently, only one thread is finishing, so no need to CAS here
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300413 _resumeNext.value = RESUME_ACTIVE
Roman Elizarov11c140a2017-07-21 21:12:55 +0300414 resumeNext.waiter.completeResumeLockWaiter(resumeNext.token)
415 }
416 else -> error("Cannot happen")
417 }
418 }
419 }
420
Roman Elizarov174c6962017-02-28 17:36:51 +0300421 override fun toString(): String {
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300422 _state.loop { state ->
Roman Elizarov174c6962017-02-28 17:36:51 +0300423 when (state) {
424 is Empty -> return "Mutex[${state.locked}]"
425 is OpDescriptor -> state.perform(this)
426 is LockedQueue -> return "Mutex[${state.owner}]"
427 else -> error("Illegal state $state")
428 }
429 }
Roman Elizarov8bd52542017-02-14 15:51:58 +0300430 }
431
Roman Elizarov174c6962017-02-28 17:36:51 +0300432 private class LockedQueue(
433 @JvmField var owner: Any
434 ) : LockFreeLinkedListHead() {
435 override fun toString(): String = "LockedQueue[$owner]"
436 }
437
438 private abstract class LockWaiter(
439 @JvmField val owner: Any?
Roman Elizarovdaa1d9d2017-03-02 19:00:50 +0300440 ) : LockFreeLinkedListNode(), DisposableHandle {
441 final override fun dispose() { remove() }
Roman Elizarov174c6962017-02-28 17:36:51 +0300442 abstract fun tryResumeLockWaiter(): Any?
443 abstract fun completeResumeLockWaiter(token: Any)
444 }
445
446 private class LockCont(
447 owner: Any?,
448 @JvmField val cont: CancellableContinuation<Unit>
449 ) : LockWaiter(owner) {
450 override fun tryResumeLockWaiter() = cont.tryResume(Unit)
451 override fun completeResumeLockWaiter(token: Any) = cont.completeResume(token)
452 override fun toString(): String = "LockCont[$owner, $cont]"
453 }
454
455 private class LockSelect<R>(
456 owner: Any?,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300457 @JvmField val mutex: Mutex,
Roman Elizarov174c6962017-02-28 17:36:51 +0300458 @JvmField val select: SelectInstance<R>,
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300459 @JvmField val block: suspend (Mutex) -> R
Roman Elizarov174c6962017-02-28 17:36:51 +0300460 ) : LockWaiter(owner) {
461 override fun tryResumeLockWaiter(): Any? = if (select.trySelect(null)) SELECT_SUCCESS else null
462 override fun completeResumeLockWaiter(token: Any) {
463 check(token === SELECT_SUCCESS)
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300464 block.startCoroutine(receiver = mutex, completion = select.completion)
Roman Elizarov174c6962017-02-28 17:36:51 +0300465 }
Roman Elizarovdb0e22d2017-08-29 18:15:57 +0300466 override fun toString(): String = "LockSelect[$owner, $mutex, $select]"
Roman Elizarov174c6962017-02-28 17:36:51 +0300467 }
Roman Elizarov8bd52542017-02-14 15:51:58 +0300468
469 // atomic unlock operation that checks that waiters queue is empty
Roman Elizarov174c6962017-02-28 17:36:51 +0300470 private class UnlockOp(
471 @JvmField val queue: LockedQueue
472 ) : OpDescriptor() {
473 override fun perform(affected: Any?): Any? {
Roman Elizarov3b558d42017-02-15 10:48:43 +0300474 /*
475 Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to
476 make a decision will reach it consistently. It does not matter what is a proposed
Roman Elizarov174c6962017-02-28 17:36:51 +0300477 decision when this UnlockOp is no longer active, because in this case the following CAS
Roman Elizarov3b558d42017-02-15 10:48:43 +0300478 will fail anyway.
479 */
480 val success = queue.isEmpty
Roman Elizarov8bd52542017-02-14 15:51:58 +0300481 val update: Any = if (success) EmptyUnlocked else queue
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300482 (affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update)
Roman Elizarov3b558d42017-02-15 10:48:43 +0300483 /*
Roman Elizarov174c6962017-02-28 17:36:51 +0300484 `perform` invocation from the original `unlock` invocation may be coming too late, when
Roman Elizarov3b558d42017-02-15 10:48:43 +0300485 some other thread had already helped to complete it (either successfully or not).
486 That operation was unsuccessful if `state` was restored to this `queue` reference and
487 that is what is being checked below.
488 */
Roman Elizarov7753f8e2017-08-15 11:16:33 +0300489 return if (affected._state.value === queue) UNLOCK_FAIL else null
Roman Elizarov8bd52542017-02-14 15:51:58 +0300490 }
491 }
Francesco Vascoe73899d2017-03-14 22:30:13 +0100492}