| /* |
| * Copyright 2016-2017 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kotlinx.coroutines.experimental |
| |
| import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead |
| import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater |
| |
| /** |
| * Mutual exclusion for coroutines. |
| * |
| * Mutex has two states: _locked_ and _unlocked_. |
| * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds |
| * the lock still suspends the invoker. |
| * |
| * @param locked initial state of the mutex |
| */ |
| public class Mutex(locked: Boolean = false) { |
| // State is: Empty | UnlockOp | LockFreeLinkedListHead (queue of Waiter objects) |
| @Volatile |
| private var state: Any? = if (locked) EmptyLocked else EmptyUnlocked // shared objects while we have no waiters |
| |
| private companion object { |
| @JvmStatic |
| private val STATE: AtomicReferenceFieldUpdater<Mutex, Any?> = |
| AtomicReferenceFieldUpdater.newUpdater(Mutex::class.java, Any::class.java, "state") |
| |
| @JvmStatic |
| private val EmptyLocked = Empty(true) |
| |
| @JvmStatic |
| private val EmptyUnlocked = Empty(false) |
| } |
| |
| /** |
| * Tries to lock this mutex, returning `false` if this mutex is already locked. |
| */ |
| public fun tryLock(): Boolean { |
| while (true) { // lock-free loop on state |
| val state = this.state |
| when (state) { |
| is Empty -> { |
| if (state.locked) return false |
| if (STATE.compareAndSet(this, state, EmptyLocked)) return true |
| } |
| is UnlockOp -> state.helpComplete() // help |
| else -> return false |
| } |
| } |
| } |
| |
| /** |
| * Locks this mutex, suspending caller while the mutex is locked. |
| * |
| * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this |
| * function is suspended, this function immediately resumes with [CancellationException]. |
| * Cancellation of suspended lock invocation is *atomic* -- when this function |
| * throws [CancellationException] it means that the mutex was not locked. |
| * |
| * Note, that this function does not check for cancellation when it is not suspended. |
| * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. |
| */ |
| public suspend fun lock() { |
| // fast-path -- try lock |
| if (tryLock()) return |
| // slow-path -- suspend |
| return lockSuspend() |
| } |
| |
| private suspend fun lockSuspend() = suspendCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont -> |
| val waiter = Waiter(cont) |
| loop@ while (true) { // lock-free loop on state |
| val state = this.state |
| when (state) { |
| is Empty -> { |
| if (state.locked) { |
| // try upgrade to queue & retry |
| STATE.compareAndSet(this, state, LockFreeLinkedListHead()) |
| continue@loop |
| } else { |
| // try lock |
| if (STATE.compareAndSet(this, state, EmptyLocked)) { |
| // locked |
| cont.resume(Unit) |
| return@sc |
| } |
| } |
| } |
| is UnlockOp -> { // help & retry |
| state.helpComplete() |
| continue@loop |
| } |
| else -> { |
| state as LockFreeLinkedListHead // type assertion |
| if (state.addLastIf(waiter, { this.state === state })) { |
| // added to waiter list! |
| cont.initCancellability() |
| cont.removeOnCompletion(waiter) |
| return@sc |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked. |
| */ |
| public fun unlock() { |
| while (true) { // lock-free loop on state |
| val state = this.state |
| when (state) { |
| is Empty -> { |
| check(state.locked) { "Mutex is not locked" } |
| if (STATE.compareAndSet(this, state, EmptyUnlocked)) return |
| } |
| is UnlockOp -> state.helpComplete() |
| else -> { |
| state as LockFreeLinkedListHead // type assertion |
| val waiter = state.removeFirstOrNull() |
| if (waiter == null) { |
| val op = UnlockOp(state) |
| if (STATE.compareAndSet(this, state, op) && op.helpComplete()) return |
| } else { |
| val cont = (waiter as Waiter).cont |
| val token = cont.tryResume(Unit) |
| if (token != null) { |
| // successfully resumed waiter that now is holding the lock |
| cont.completeResume(token) |
| return |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private class Empty(val locked: Boolean) { |
| override fun toString(): String = "Empty[${if (locked) "Locked" else "Unlocked"}]"; |
| } |
| |
| private class Waiter(val cont: CancellableContinuation<Unit>) : LockFreeLinkedListNode() |
| |
| // atomic unlock operation that checks that waiters queue is empty |
| private inner class UnlockOp(val queue: LockFreeLinkedListHead) { |
| fun helpComplete(): Boolean { |
| val success = queue.isEmpty // Note: queue cannot change anymore (so decision is consistent) |
| val update: Any = if (success) EmptyUnlocked else queue |
| STATE.compareAndSet(this@Mutex, this@UnlockOp, update) |
| return success |
| } |
| } |
| } |