blob: b5265fa6218e50db0020c6ef1f9a0fd404c9aa4c [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030017package kotlinx.coroutines.experimental.channels
18
19import java.util.concurrent.locks.ReentrantLock
20
21/**
22 * Channel with array buffer of a fixed [capacity].
23 * Sender suspends only when buffer is fully and receiver suspends only when buffer is empty.
24 *
25 * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
26 * The lists of suspended senders or receivers are lock-free.
27 */
Roman Elizarovf138bbc2017-02-09 19:13:08 +030028public class ArrayChannel<E>(
29 /**
30 * Buffer capacity.
31 */
32 val capacity: Int
33) : AbstractChannel<E>() {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030034 init {
35 check(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
36 }
37
38 private val lock = ReentrantLock()
39 private val buffer: Array<Any?> = arrayOfNulls<Any?>(capacity)
40 private var head: Int = 0
41 @Volatile
42 private var size: Int = 0
43
44 private inline fun <T> locked(block: () -> T): T {
45 lock.lock()
46 return try { block() }
47 finally { lock.unlock() }
48 }
49
50 override val hasBuffer: Boolean get() = true
51 override val isBufferEmpty: Boolean get() = size == 0
52 override val isBufferFull: Boolean get() = size == capacity
53
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030054 // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
55 override fun offerInternal(element: E): Any {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030056 var token: Any? = null
57 var receive: ReceiveOrClosed<E>? = null
58 locked {
59 val size = this.size
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030060 closedForSend?.let { return it }
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030061 if (size < capacity) {
62 // tentatively put element to buffer
63 this.size = size + 1 // update size before checking queue (!!!)
64 // check for receivers that were waiting on empty queue
65 if (size == 0) {
66 while (true) {
67 receive = takeFirstReceiveOrPeekClosed() ?: break // break when no receivers queued
68 token = receive!!.tryResumeReceive(element)
69 if (token != null) {
70 this.size = size // restore size
71 return@locked
72 }
73 }
74 }
75 buffer[(head + size) % capacity] = element // actually queue element
76 return OFFER_SUCCESS
77 }
78 // size == capacity: full
79 return OFFER_FAILED
80 }
81 // breaks here if offer meets receiver
82 receive!!.completeResumeReceive(token!!)
83 return receive!!.offerResult
84 }
85
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030086 // result is `E | POLL_EMPTY | Closed`
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030087 override fun pollInternal(): Any? {
88 var token: Any? = null
89 var send: Send? = null
90 var result: Any? = null
91 locked {
92 val size = this.size
Roman Elizarovf6fed2a2017-02-03 19:12:09 +030093 if (size == 0) return closedForSend ?: POLL_EMPTY
Roman Elizarov7b2d8b02017-02-02 20:09:14 +030094 // size > 0: not empty -- retrieve element
95 result = buffer[head]
96 buffer[head] = null
97 this.size = size - 1 // update size before checking queue (!!!)
98 // check for senders that were waiting on full queue
99 var replacement: Any? = POLL_EMPTY
100 if (size == capacity) {
101 while (true) {
102 send = takeFirstSendOrPeekClosed() ?: break
103 token = send!!.tryResumeSend()
104 if (token != null) {
105 replacement = send!!.pollResult
106 break
107 }
108 }
109 }
Roman Elizarovf6fed2a2017-02-03 19:12:09 +0300110 if (replacement !== POLL_EMPTY && !isClosed(replacement)) {
Roman Elizarov7b2d8b02017-02-02 20:09:14 +0300111 this.size = size // restore size
112 buffer[(head + size) % capacity] = replacement
113 }
114 head = (head + 1) % capacity
115 }
116 // complete send the we're taken replacement from
117 if (token != null)
118 send!!.completeResumeSend(token!!)
119 return result
120 }
121}