Mutex added
diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md
index 1a0808b..10d20cb 100644
--- a/kotlinx-coroutines-core/README.md
+++ b/kotlinx-coroutines-core/README.md
@@ -21,7 +21,14 @@
| [Executor.toCoroutineDispatcher][java.util.concurrent.Executor.toCoroutineDispatcher] | Extension to convert any executor
| [Unconfined] | Does not confine coroutine execution in any way
-The following top-level suspending functions are provided to be used _inside coroutines_:
+Synchronization primitives for coroutines:
+
+| **Name** | **Suspending functions** | **Description**
+| ---------- | ----------------------------------------------------------- | ---------------
+| [Mutex] | [lock][Mutex.lock] | Mutual exclusion
+| [Channel] | [send][SendChannel.send], [receive][ReceiveChannel.receive] | Communication channel (aka queue or exchanger)
+
+The following _top-level_ suspending functions are provided to be used inside coroutines:
| **Name** | **Description**
| ------------- | ---------------
@@ -46,6 +53,8 @@
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/index.html
+[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-mutex/index.html
+[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-mutex/lock.html
[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-non-cancellable/index.html
[Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
[java.util.concurrent.Executor.toCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/java.util.concurrent.-executor/to-coroutine-dispatcher.html
@@ -61,8 +70,11 @@
[withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html
[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
<!--- INDEX kotlinx.coroutines.experimental.channels -->
+[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
[ProducerJob]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-job/index.html
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html
+[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
+[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
<!--- END -->
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Mutex.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Mutex.kt
new file mode 100644
index 0000000..01c46f0
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Mutex.kt
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+
+/**
+ * Mutual exclusion for coroutines.
+ *
+ * Mutex has two states: _locked_ and _unlocked_.
+ * It is **non-reentrant**, that is invoking [lock] even from the same thread/coroutine that currently holds
+ * the lock still suspends the invoker.
+ *
+ * @param locked initial state of the mutex
+ */
+public class Mutex(locked: Boolean = false) {
+ // State is: Empty | UnlockOp | LockFreeLinkedListHead (queue of Waiter objects)
+ @Volatile
+ private var state: Any? = if (locked) EmptyLocked else EmptyUnlocked // shared objects while we have no waiters
+
+ private companion object {
+ @JvmStatic
+ private val STATE: AtomicReferenceFieldUpdater<Mutex, Any?> =
+ AtomicReferenceFieldUpdater.newUpdater(Mutex::class.java, Any::class.java, "state")
+
+ @JvmStatic
+ private val EmptyLocked = Empty(true)
+
+ @JvmStatic
+ private val EmptyUnlocked = Empty(false)
+ }
+
+ /**
+ * Tries to lock this mutex, returning `false` if this mutex is already locked.
+ */
+ public fun tryLock(): Boolean {
+ while (true) { // lock-free loop on state
+ val state = this.state
+ when (state) {
+ is Empty -> {
+ if (state.locked) return false
+ if (STATE.compareAndSet(this, state, EmptyLocked)) return true
+ }
+ is UnlockOp -> state.helpComplete() // help
+ else -> return false
+ }
+ }
+ }
+
+ /**
+ * Locks this mutex, suspending caller while the mutex is locked.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * Cancellation of suspended lock invocation is *atomic* -- when this function
+ * throws [CancellationException] it means that the mutex was not locked.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ */
+ public suspend fun lock() {
+ // fast-path -- try lock
+ if (tryLock()) return
+ // slow-path -- suspend
+ return lockSuspend()
+ }
+
+ private suspend fun lockSuspend() = suspendCancellableCoroutine<Unit>(holdCancellability = true) sc@ { cont ->
+ val waiter = Waiter(cont)
+ loop@ while (true) { // lock-free loop on state
+ val state = this.state
+ when (state) {
+ is Empty -> {
+ if (state.locked) {
+ // try upgrade to queue & retry
+ STATE.compareAndSet(this, state, LockFreeLinkedListHead())
+ continue@loop
+ } else {
+ // try lock
+ if (STATE.compareAndSet(this, state, EmptyLocked)) {
+ // locked
+ cont.resume(Unit)
+ return@sc
+ }
+ }
+ }
+ is UnlockOp -> { // help & retry
+ state.helpComplete()
+ continue@loop
+ }
+ else -> {
+ state as LockFreeLinkedListHead // type assertion
+ if (state.addLastIf(waiter, { this.state === state })) {
+ // added to waiter list!
+ cont.initCancellability()
+ cont.removeOnCompletion(waiter)
+ return@sc
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked.
+ */
+ public fun unlock() {
+ while (true) { // lock-free loop on state
+ val state = this.state
+ when (state) {
+ is Empty -> {
+ check(state.locked) { "Mutex is not locked" }
+ if (STATE.compareAndSet(this, state, EmptyUnlocked)) return
+ }
+ is UnlockOp -> state.helpComplete()
+ else -> {
+ state as LockFreeLinkedListHead // type assertion
+ val waiter = state.removeFirstOrNull()
+ if (waiter == null) {
+ val op = UnlockOp(state)
+ if (STATE.compareAndSet(this, state, op) && op.helpComplete()) return
+ } else {
+ val cont = (waiter as Waiter).cont
+ val token = cont.tryResume(Unit)
+ if (token != null) {
+ // successfully resumed waiter that now is holding the lock
+ cont.completeResume(token)
+ return
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class Empty(val locked: Boolean) {
+ override fun toString(): String = "Empty[${if (locked) "Locked" else "Unlocked"}]";
+ }
+
+ private class Waiter(val cont: CancellableContinuation<Unit>) : LockFreeLinkedListNode()
+
+ // atomic unlock operation that checks that waiters queue is empty
+ private inner class UnlockOp(val queue: LockFreeLinkedListHead) {
+ fun helpComplete(): Boolean {
+ val success = queue.isEmpty // Note: queue cannot change anymore (so decision is consistent)
+ val update: Any = if (success) EmptyUnlocked else queue
+ STATE.compareAndSet(this@Mutex, this@UnlockOp, update)
+ return success
+ }
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/MutexTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/MutexTest.kt
new file mode 100644
index 0000000..ff15004
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/MutexTest.kt
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2016-2017 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kotlinx.coroutines.experimental
+
+import org.junit.Assert.*
+import org.junit.Test
+
+class MutexTest : TestBase() {
+ @Test
+ fun testSimple() = runBlocking<Unit> {
+ val mutex = Mutex()
+ expect(1)
+ launch(context) {
+ expect(4)
+ mutex.lock() // suspends
+ expect(7) // now got lock
+ mutex.unlock()
+ expect(8)
+ }
+ expect(2)
+ mutex.lock() // locked
+ expect(3)
+ yield() // yield to child
+ expect(5)
+ mutex.unlock()
+ expect(6)
+ yield() // now child has lock
+ finish(9)
+ }
+
+ @Test
+ fun tryLockTest() {
+ val mutex = Mutex()
+ assertTrue(mutex.tryLock())
+ assertFalse(mutex.tryLock())
+ mutex.unlock()
+ assertTrue(mutex.tryLock())
+ assertFalse(mutex.tryLock())
+ mutex.unlock()
+ }
+
+ @Test
+ fun testStress() = runBlocking<Unit> {
+ val n = 1000
+ val k = 100
+ var shared = 0
+ val mutex = Mutex()
+ val jobs = List(n) {
+ launch(CommonPool) {
+ repeat(k) {
+ mutex.lock()
+ shared++
+ mutex.unlock()
+ }
+ }
+ }
+ jobs.forEach { it.join() }
+ println("Shared value = $shared")
+ assertEquals(n * k, shared)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
index 450564e..40b3261 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.kt
@@ -43,7 +43,7 @@
@After
fun onCompletion() {
error.get()?.let { throw it }
- check(finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
+ check(actionIndex.get() == 0 || finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
}
}
\ No newline at end of file