blob: cff2894142f0b3a4c6b25dae235b97914ce3b704 [file] [log] [blame]
Roman Elizarov7b2d8b02017-02-02 20:09:14 +03001package kotlinx.coroutines.experimental.channels
2
3import java.util.concurrent.locks.ReentrantLock
4
5/**
6 * Channel with array buffer of a fixed [capacity].
7 * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
8 *
9 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
10 * The lists of suspended senders or receivers are lock-free.
11 */
12public class ArrayChannel<E>(val capacity: Int) : AbstractChannel<E>() {
13 init {
14 check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
15 }
16
17 private val lock = ReentrantLock()
18 private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
19 private var head: Int = 0
20 @Volatile
21 private var size: Int = 0
22
23 private inline fun <T> locked(block: () -> T): T {
24 lock.lock()
25 return try { block() }
26 finally { lock.unlock() }
27 }
28
29 override val hasBuffer: Boolean get() = true
30 override val isBufferEmpty: Boolean get() = size == 0
31 override val isBufferFull: Boolean get() = size == capacity
32
33 override fun offerInternal(element: E): Int {
34 var token: Any? = null
35 var receive: ReceiveOrClosed<E>? = null
36 locked {
37 val size = this.size
38 if (isClosedForSend) return OFFER_CLOSED
39 if (size < capacity) {
40 // tentatively put element to buffer
41 this.size = size + 1 // update size before checking queue (!!!)
42 // check for receivers that were waiting on empty queue
43 if (size == 0) {
44 while (true) {
45 receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
46 token = receive!!.tryResumeReceive(element)
47 if (token != null) {
48 this.size = size // restore size
49 return@locked
50 }
51 }
52 }
53 buffer[(head + size) % capacity] = element // actually queue element
54 return OFFER_SUCCESS
55 }
56 // size == capacity: full
57 return OFFER_FAILED
58 }
59 // breaks here if offer meets receiver
60 receive!!.completeResumeReceive(token!!)
61 return receive!!.offerResult
62 }
63
64 // result is `E | POLL_EMPTY | POLL_CLOSED`
65 override fun pollInternal(): Any? {
66 var token: Any? = null
67 var send: Send? = null
68 var result: Any? = null
69 locked {
70 val size = this.size
71 if (size == 0) return if (isClosedTokenFirstInQueue) POLL_CLOSED else POLL_EMPTY
72 // size > 0: not empty -- retrieve element
73 result = buffer[head]
74 buffer[head] = null
75 this.size = size - 1 // update size before checking queue (!!!)
76 // check for senders that were waiting on full queue
77 var replacement: Any? = POLL_EMPTY
78 if (size == capacity) {
79 while (true) {
80 send = takeFirstSendOrPeekClosed() ?: break
81 token = send!!.tryResumeSend()
82 if (token != null) {
83 replacement = send!!.pollResult
84 break
85 }
86 }
87 }
88 if (replacement !== POLL_EMPTY && replacement !== POLL_CLOSED) {
89 this.size = size // restore size
90 buffer[(head + size) % capacity] = replacement
91 }
92 head = (head + 1) % capacity
93 }
94 // complete send the we're taken replacement from
95 if (token != null)
96 send!!.completeResumeSend(token!!)
97 return result
98 }
99}