blob: b3672841ea1b0c0110dc7ea6168b2d20bd3ce411 [file] [log] [blame]
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.experimental.sync
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
import kotlinx.coroutines.experimental.internalAnnotations.*
/**
* Mutual exclusion for coroutines.
*
* Mutex has two states: _locked_ and _unlocked_.
* It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
* the lock still suspends the invoker.
*
* JVM API note:
* Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
* An unlock on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
* Unsuccessful call to [tryLock] do not have any memory effects.
*/
public interface Mutex {
/**
* Returns `true` when this mutex is locked.
*/
public val isLocked: Boolean
/**
* Tries to lock this mutex, returning `false` if this mutex is already locked.
*
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* is already locked with the same token (same identity), this function throws [IllegalStateException].
*/
public fun tryLock(owner: Any? = null): Boolean
/**
* Locks this mutex, suspending caller while the mutex is locked.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended lock invocation is atomic* -- when this function
* throws [CancellationException] it means that the mutex was not locked.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this lock operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onLock] clause.
* Use [tryLock] to try acquire lock without waiting.
*
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* is already locked with the same token (same identity), this function throws [IllegalStateException].
*/
public suspend fun lock(owner: Any? = null)
/**
* Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
* Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
* the reference to this mutex is passed into the corresponding block.
*/
public val onLock: SelectClause2<Any?, Mutex>
/**
* Checks mutex locked by owner
*
* @return `true` on mutex lock by owner, `false` if not locker or it is locked by different owner
*/
public fun holdsLock(owner: Any): Boolean
/**
* Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked.
*
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* was locked with the different token (by identity), this function throws [IllegalStateException].
*/
public fun unlock(owner: Any? = null)
}
/**
* Creates new [Mutex] instance.
* The mutex created is fair: lock is granted in first come, first served order.
*
* @param locked initial state of the mutex.
*/
@Suppress("FunctionName")
public fun Mutex(locked: Boolean = false): Mutex =
MutexImpl(locked)
/**
* Executes the given [action] under this mutex's lock.
*
* @param owner Optional owner token for debugging.
*
* @return the return value of the action.
*/
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
lock(owner)
try {
return action()
} finally {
unlock(owner)
}
}
/**
* @suppress: **Deprecated**: binary compatibility with old code
*/
@Deprecated("binary compatibility with old code", level = DeprecationLevel.HIDDEN)
public suspend fun <T> Mutex.withLock(owner: Any? = null, action: suspend () -> T): T =
withLock(owner) { action() }
/**
* @suppress: **Deprecated**: Use [withLock]
*/
@Deprecated("Use `withLock(owner, action)", level = DeprecationLevel.HIDDEN)
public suspend fun <T> Mutex.withLock(action: suspend () -> T): T =
withLock { action() }
/**
* @suppress: **Deprecated**: Use [withLock]
*/
@Deprecated("Use `withLock`", replaceWith = ReplaceWith("withLock(action)"))
public suspend fun <T> Mutex.withMutex(action: suspend () -> T): T =
withLock { action() }
private val LOCK_FAIL = Symbol("LOCK_FAIL")
private val ENQUEUE_FAIL = Symbol("ENQUEUE_FAIL")
private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL")
private val SELECT_SUCCESS = Symbol("SELECT_SUCCESS")
private val LOCKED = Symbol("LOCKED")
private val UNLOCKED = Symbol("UNLOCKED")
private val RESUME_QUIESCENT = Symbol("RESUME_QUIESCENT")
private val RESUME_ACTIVE = Symbol("RESUME_ACTIVE")
private val EmptyLocked = Empty(LOCKED)
private val EmptyUnlocked = Empty(UNLOCKED)
private class Empty(
@JvmField val locked: Any
) {
override fun toString(): String = "Empty[$locked]"
}
internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// State is: Empty | LockedQueue | OpDescriptor
// shared objects while we have no waiters
private val _state = atomic<Any?>(if (locked) EmptyLocked else EmptyUnlocked)
// resumeNext is: RESUME_QUIESCENT | RESUME_ACTIVE | ResumeReq
private val _resumeNext = atomic<Any>(RESUME_QUIESCENT)
public override val isLocked: Boolean get() {
_state.loop { state ->
when (state) {
is Empty -> return state.locked !== UNLOCKED
is LockedQueue -> return true
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
// for tests ONLY
internal val isLockedEmptyQueueState: Boolean get() {
val state = _state.value
return state is LockedQueue && state.isEmpty
}
public override fun tryLock(owner: Any?): Boolean {
_state.loop { state ->
when (state) {
is Empty -> {
if (state.locked !== UNLOCKED) return false
val update = if (owner == null) EmptyLocked else Empty(
owner
)
if (_state.compareAndSet(state, update)) return true
}
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
return false
}
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
public override suspend fun lock(owner: Any?) {
// fast-path -- try lock
if (tryLock(owner)) return
// slow-path -- suspend
return lockSuspend(owner)
}
private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
val waiter = LockCont(owner, cont)
_state.loop { state ->
when (state) {
is Empty -> {
if (state.locked !== UNLOCKED) { // try upgrade to queue & retry
_state.compareAndSet(state, LockedQueue(state.locked))
} else {
// try lock
val update = if (owner == null) EmptyLocked else Empty(owner)
if (_state.compareAndSet(state, update)) { // locked
cont.resume(Unit)
return@sc
}
}
}
is LockedQueue -> {
val curOwner = state.owner
check(curOwner !== owner) { "Already locked by $owner" }
if (state.addLastIf(waiter, { _state.value === state })) {
// added to waiter list!
cont.initCancellability() // make it properly cancellable
cont.removeOnCancellation(waiter)
return@sc
}
}
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
override val onLock: SelectClause2<Any?, Mutex>
get() = this
// registerSelectLock
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, owner: Any?, block: suspend (Mutex) -> R) {
while (true) { // lock-free loop on state
if (select.isSelected) return
val state = _state.value
when (state) {
is Empty -> {
if (state.locked !== UNLOCKED) { // try upgrade to queue & retry
_state.compareAndSet(state, LockedQueue(state.locked))
} else {
// try lock
val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
when {
failure == null -> { // success
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
return
}
failure === ALREADY_SELECTED -> return // already selected -- bail out
failure === LOCK_FAIL -> {} // retry
else -> error("performAtomicTrySelect(TryLockDesc) returned $failure")
}
}
}
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
val failure = select.performAtomicIfNotSelected(enqueueOp)
when {
failure == null -> { // successfully enqueued
select.disposeOnSelect(enqueueOp.node)
return
}
failure === ALREADY_SELECTED -> return // already selected -- bail out
failure === ENQUEUE_FAIL -> {} // retry
else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
}
}
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
}
}
}
private class TryLockDesc(
@JvmField val mutex: MutexImpl,
@JvmField val owner: Any?
) : AtomicDesc() {
// This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation
private inner class PrepareOp(private val op: AtomicOp<*>) : OpDescriptor() {
override fun perform(affected: Any?): Any? {
val update: Any = if (op.isDecided) EmptyUnlocked else op // restore if was already decided
(affected as MutexImpl)._state.compareAndSet(this, update)
return null // ok
}
}
override fun prepare(op: AtomicOp<*>): Any? {
val prepare = PrepareOp(op)
if (!mutex._state.compareAndSet(EmptyUnlocked, prepare)) return LOCK_FAIL
return prepare.perform(mutex)
}
override fun complete(op: AtomicOp<*>, failure: Any?) {
val update = if (failure != null) EmptyUnlocked else {
if (owner == null) EmptyLocked else Empty(owner)
}
mutex._state.compareAndSet(op, update)
}
}
private class TryEnqueueLockDesc<R>(
@JvmField val mutex: MutexImpl,
owner: Any?,
queue: LockedQueue,
select: SelectInstance<R>,
block: suspend (Mutex) -> R
) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
if (mutex._state.value !== queue) return ENQUEUE_FAIL
return super.onPrepare(affected, next)
}
}
public override fun holdsLock(owner: Any) =
_state.value.let { state ->
when (state) {
is Empty -> state.locked === owner
is LockedQueue -> state.owner === owner
else -> false
}
}
public override fun unlock(owner: Any?) {
_state.loop { state ->
when (state) {
is Empty -> {
if (owner == null)
check(state.locked !== UNLOCKED) { "Mutex is not locked" }
else
check(state.locked === owner) { "Mutex is locked by ${state.locked} but expected $owner" }
if (_state.compareAndSet(state, EmptyUnlocked)) return
}
is OpDescriptor -> state.perform(this)
is LockedQueue -> {
if (owner != null)
check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" }
val waiter = state.removeFirstOrNull()
if (waiter == null) {
val op = UnlockOp(state)
if (_state.compareAndSet(state, op) && op.perform(this) == null) return
} else {
val token = (waiter as LockWaiter).tryResumeLockWaiter()
if (token != null) {
// successfully resumed waiter that now is holding the lock
// we must immediately transfer ownership to the next waiter, because this coroutine
// might try to lock it again after unlock returns do to StackOverflow avoidance code
// and its attempts to take a lock must be queued.
state.owner = waiter.owner ?: LOCKED
// StackOverflow avoidance code
if (startResumeNext(waiter, token)) {
waiter.completeResumeLockWaiter(token)
finishResumeNext()
}
return
}
}
}
else -> error("Illegal state $state")
}
}
}
private class ResumeReq(
@JvmField val waiter: LockWaiter,
@JvmField val token: Any
)
private fun startResumeNext(waiter: LockWaiter, token: Any): Boolean {
_resumeNext.loop { resumeNext ->
when {
resumeNext === RESUME_QUIESCENT -> {
// this is never concurrent, because only one thread is holding mutex and trying to resume
// next waiter, so no need to CAS here
_resumeNext.value = RESUME_ACTIVE
return true
}
resumeNext === RESUME_ACTIVE ->
if (_resumeNext.compareAndSet(resumeNext, ResumeReq(waiter, token))) return false
else -> error("Cannot happen")
}
}
}
private fun finishResumeNext() {
// also a resumption loop to fulfill requests of inner resume invokes
_resumeNext.loop { resumeNext ->
when {
resumeNext === RESUME_ACTIVE ->
if (_resumeNext.compareAndSet(resumeNext, RESUME_QUIESCENT)) return
resumeNext is ResumeReq -> {
// this is never concurrently, only one thread is finishing, so no need to CAS here
_resumeNext.value = RESUME_ACTIVE
resumeNext.waiter.completeResumeLockWaiter(resumeNext.token)
}
else -> error("Cannot happen")
}
}
}
override fun toString(): String {
_state.loop { state ->
when (state) {
is Empty -> return "Mutex[${state.locked}]"
is OpDescriptor -> state.perform(this)
is LockedQueue -> return "Mutex[${state.owner}]"
else -> error("Illegal state $state")
}
}
}
private class LockedQueue(
@JvmField var owner: Any
) : LockFreeLinkedListHead() {
override fun toString(): String = "LockedQueue[$owner]"
}
private abstract class LockWaiter(
@JvmField val owner: Any?
) : LockFreeLinkedListNode(), DisposableHandle {
final override fun dispose() { remove() }
abstract fun tryResumeLockWaiter(): Any?
abstract fun completeResumeLockWaiter(token: Any)
}
private class LockCont(
owner: Any?,
@JvmField val cont: CancellableContinuation<Unit>
) : LockWaiter(owner) {
override fun tryResumeLockWaiter() = cont.tryResume(Unit)
override fun completeResumeLockWaiter(token: Any) = cont.completeResume(token)
override fun toString(): String = "LockCont[$owner, $cont]"
}
private class LockSelect<R>(
owner: Any?,
@JvmField val mutex: Mutex,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Mutex) -> R
) : LockWaiter(owner) {
override fun tryResumeLockWaiter(): Any? = if (select.trySelect(null)) SELECT_SUCCESS else null
override fun completeResumeLockWaiter(token: Any) {
check(token === SELECT_SUCCESS)
block.startCoroutine(receiver = mutex, completion = select.completion)
}
override fun toString(): String = "LockSelect[$owner, $mutex, $select]"
}
// atomic unlock operation that checks that waiters queue is empty
private class UnlockOp(
@JvmField val queue: LockedQueue
) : OpDescriptor() {
override fun perform(affected: Any?): Any? {
/*
Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to
make a decision will reach it consistently. It does not matter what is a proposed
decision when this UnlockOp is no longer active, because in this case the following CAS
will fail anyway.
*/
val success = queue.isEmpty
val update: Any = if (success) EmptyUnlocked else queue
(affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update)
/*
`perform` invocation from the original `unlock` invocation may be coming too late, when
some other thread had already helped to complete it (either successfully or not).
That operation was unsuccessful if `state` was restored to this `queue` reference and
that is what is being checked below.
*/
return if (affected._state.value === queue) UNLOCK_FAIL else null
}
}
}