blob: 6ed1cca2399c9e0c0d708834a25ac75d3d8f73d7 [file] [log] [blame]
Roman Elizarov5d94a262017-12-28 00:23:39 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov5d94a262017-12-28 00:23:39 +03003 */
4
5package kotlinx.coroutines.experimental.internal
6
7import kotlinx.atomicfu.*
8import java.util.concurrent.atomic.*
9
10private typealias Core<E> = LockFreeMPSCQueueCore<E>
11
12/**
Roman Elizarove873c0a2018-01-10 18:26:57 +030013 * Lock-free Multiply-Producer Single-Consumer Queue.
14 * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
15 *
16 * In particular, the following execution is permitted for this queue, but is not permitted for a linearizable queue:
17 *
18 * ```
19 * Thread 1: addLast(1) = true, removeFirstOrNull() = null
20 * Thread 2: addLast(2) = 2 // this operation is concurrent with both operations in the first thread
21 * ```
22 *
Roman Elizarov5d94a262017-12-28 00:23:39 +030023 * @suppress **This is unstable API and it is subject to change.**
24 */
Sergey Mashkov883e57e2018-01-12 18:45:25 +030025class LockFreeMPSCQueue<E : Any> {
Roman Elizarov5d94a262017-12-28 00:23:39 +030026 private val _cur = atomic(Core<E>(Core.INITIAL_CAPACITY))
27
28 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
29 val isEmpty: Boolean get() = _cur.value.isEmpty
30
31 fun close() {
32 _cur.loop { cur ->
33 if (cur.close()) return // closed this copy
34 _cur.compareAndSet(cur, cur.next()) // move to next
35 }
36 }
37
38 fun addLast(element: E): Boolean {
39 _cur.loop { cur ->
40 when (cur.addLast(element)) {
41 Core.ADD_SUCCESS -> return true
42 Core.ADD_CLOSED -> return false
43 Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
44 }
45 }
46 }
47
48 @Suppress("UNCHECKED_CAST")
49 fun removeFirstOrNull(): E? {
50 _cur.loop { cur ->
51 val result = cur.removeFirstOrNull()
52 if (result !== Core.REMOVE_FROZEN) return result as E?
53 _cur.compareAndSet(cur, cur.next())
54 }
55 }
56}
57
58/**
Roman Elizarove873c0a2018-01-10 18:26:57 +030059 * Lock-free Multiply-Producer Single-Consumer Queue core.
60 * *Note: This queue is NOT linearizable. It provides only quiescent consistency for its operations.*
61 *
62 * @see LockFreeMPSCQueue
Roman Elizarov5d94a262017-12-28 00:23:39 +030063 * @suppress **This is unstable API and it is subject to change.**
64 */
65internal class LockFreeMPSCQueueCore<E : Any>(private val capacity: Int) {
66 private val mask = capacity - 1
67 private val _next = atomic<Core<E>?>(null)
68 private val _state = atomic(0L)
69 private val array = AtomicReferenceArray<Any?>(capacity);
70
71 init {
72 check(mask <= MAX_CAPACITY_MASK)
73 check(capacity and mask == 0)
74 }
75
76 // Note: it is not atomic w.r.t. remove operation (remove can transiently fail when isEmpty is false)
77 val isEmpty: Boolean get() = _state.value.withState { head, tail -> head == tail }
78
79 fun close(): Boolean {
80 _state.update { state ->
81 if (state and CLOSED_MASK != 0L) return true // ok - already closed
82 if (state and FROZEN_MASK != 0L) return false // frozen -- try next
83 state or CLOSED_MASK // try set closed bit
84 }
85 return true
86 }
87
88 // ADD_CLOSED | ADD_FROZEN | ADD_SUCCESS
89 fun addLast(element: E): Int {
90 _state.loop { state ->
91 if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
92 state.withState { head, tail ->
93 // there could be one REMOVE element beyond head that we cannot stump up,
94 // so we check for full queue with an extra margin of one element
95 if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
96 val newTail = (tail + 1) and MAX_CAPACITY_MASK
97 if (_state.compareAndSet(state, state.updateTail(newTail))) {
98 // successfully added
99 array[tail and mask] = element
100 // could have been frozen & copied before this item was set -- correct it by filling placeholder
101 var cur = this
102 while(true) {
103 if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
104 cur = cur.next().fillPlaceholder(tail, element) ?: break
105 }
106 return ADD_SUCCESS // added successfully
107 }
108 }
109 }
110 }
111
112 private fun fillPlaceholder(index: Int, element: E): Core<E>? {
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300113 val old = array.get(index and mask)
Vsevolod Tolstopyatov0cf99bc2018-06-05 19:03:44 +0300114 /*
115 * addLast actions:
116 * 1) Commit tail slot
117 * 2) Write element to array slot
118 * 3) Check for array copy
119 *
120 * If copy happened between 2 and 3 then the consumer might have consumed our element,
121 * then another producer might have written its placeholder in our slot, so we should
122 * perform *unique* check that current placeholder is our to avoid overwriting another producer placeholder
123 */
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300124 if (old is Placeholder && old.index == index) {
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300125 array.set(index and mask, element)
Roman Elizarov5d94a262017-12-28 00:23:39 +0300126 // we've corrected missing element, should check if that propagated to further copies, just in case
127 return this
128 }
129 // it is Ok, no need for further action
130 return null
131 }
132
133 // SINGLE CONSUMER
134 // REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
135 fun removeFirstOrNull(): Any? {
136 _state.loop { state ->
137 if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
138 state.withState { head, tail ->
139 if ((tail and mask) == (head and mask)) return null // empty
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300140 // because queue is Single Consumer, then element == null|Placeholder can only be when add has not finished yet
Roman Elizarov5d94a262017-12-28 00:23:39 +0300141 val element = array[head and mask] ?: return null
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300142 if (element is Placeholder) return null // same story -- consider it not added yet
143 // we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
Roman Elizarov5d94a262017-12-28 00:23:39 +0300144 val newHead = (head + 1) and MAX_CAPACITY_MASK
145 if (_state.compareAndSet(state, state.updateHead(newHead))) {
146 array[head and mask] = null // now can safely put null (state was updated)
147 return element // successfully removed in fast-path
148 }
149 // Slow-path for remove in case of interference
150 var cur = this
151 while (true) {
152 cur = cur.removeSlowPath(head, newHead) ?: return element
153 }
154 }
155 }
156 }
157
158 private fun removeSlowPath(oldHead: Int, newHead: Int): Core<E>? {
159 _state.loop { state ->
160 state.withState { head, _ ->
161 check(head == oldHead) { "This queue can have only one consumer" }
162 if (state and FROZEN_MASK != 0L) {
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300163 // state was already frozen, so removed element was copied to next
164 return next() // continue to correct head in next
Roman Elizarov5d94a262017-12-28 00:23:39 +0300165 }
166 if (_state.compareAndSet(state, state.updateHead(newHead))) {
167 array[head and mask] = null // now can safely put null (state was updated)
168 return null
169 }
170 }
171 }
172 }
173
174 fun next(): LockFreeMPSCQueueCore<E> = allocateOrGetNextCopy(markFrozen())
175
176 private fun markFrozen(): Long =
177 _state.updateAndGet { state ->
178 if (state and FROZEN_MASK != 0L) return state // already marked
179 state or FROZEN_MASK
180 }
181
182 private fun allocateOrGetNextCopy(state: Long): Core<E> {
183 _next.loop { next ->
184 if (next != null) return next // already allocated & copied
185 _next.compareAndSet(null, allocateNextCopy(state))
186 }
187 }
188
189 private fun allocateNextCopy(state: Long): Core<E> {
190 val next = LockFreeMPSCQueueCore<E>(capacity * 2)
191 state.withState { head, tail ->
192 var index = head
193 while (index and mask != tail and mask) {
194 // replace nulls with placeholders on copy
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300195 next.array[index and next.mask] = array[index and mask] ?: Placeholder(index)
Roman Elizarov5d94a262017-12-28 00:23:39 +0300196 index++
197 }
198 next._state.value = state wo FROZEN_MASK
199 }
200 return next
201 }
202
Roman Elizarovc22a1c72018-05-30 21:58:02 +0300203 // Instance of this class is placed into array when we have to copy array, but addLast is in progress --
204 // it had already reserved a slot in the array (with null) and have not yet put its value there.
205 // Placeholder keeps the actual index (not masked) to distinguish placeholders on different wraparounds of array
206 private class Placeholder(@JvmField val index: Int)
207
Roman Elizarov5d94a262017-12-28 00:23:39 +0300208 @Suppress("PrivatePropertyName")
209 internal companion object {
210 internal const val INITIAL_CAPACITY = 8
211
212 private const val CAPACITY_BITS = 30
213 private const val MAX_CAPACITY_MASK = (1 shl CAPACITY_BITS) - 1
214 private const val HEAD_SHIFT = 0
215 private const val HEAD_MASK = MAX_CAPACITY_MASK.toLong() shl HEAD_SHIFT
216 private const val TAIL_SHIFT = HEAD_SHIFT + CAPACITY_BITS
217 private const val TAIL_MASK = MAX_CAPACITY_MASK.toLong() shl TAIL_SHIFT
218
219 private const val FROZEN_SHIFT = TAIL_SHIFT + CAPACITY_BITS
220 private const val FROZEN_MASK = 1L shl FROZEN_SHIFT
221 private const val CLOSED_SHIFT = FROZEN_SHIFT + 1
222 private const val CLOSED_MASK = 1L shl CLOSED_SHIFT
223
224 @JvmField internal val REMOVE_FROZEN = Symbol("REMOVE_FROZEN")
225
226 internal const val ADD_SUCCESS = 0
227 internal const val ADD_FROZEN = 1
228 internal const val ADD_CLOSED = 2
229
Roman Elizarov5d94a262017-12-28 00:23:39 +0300230 private infix fun Long.wo(other: Long) = this and other.inv()
231 private fun Long.updateHead(newHead: Int) = (this wo HEAD_MASK) or (newHead.toLong() shl HEAD_SHIFT)
232 private fun Long.updateTail(newTail: Int) = (this wo TAIL_MASK) or (newTail.toLong() shl TAIL_SHIFT)
233
234 private inline fun <T> Long.withState(block: (head: Int, tail: Int) -> T): T {
235 val head = ((this and HEAD_MASK) shr HEAD_SHIFT).toInt()
236 val tail = ((this and TAIL_MASK) shr TAIL_SHIFT).toInt()
237 return block(head, tail)
238 }
239
240 // FROZEN | CLOSED
241 private fun Long.addFailReason(): Int = if (this and CLOSED_MASK != 0L) ADD_CLOSED else ADD_FROZEN
242 }
243}