blob: 8b0fa90b2d57993053d287b3f1cd135199aea773 [file] [log] [blame]
Cody Schuffelen134ff032019-11-22 00:25:32 -08001#pragma once
2
3/*
4 * Copyright (C) 2017 The Android Open Source Project
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <cerrno>
20#include <cstring>
21
22#include <sys/uio.h>
23
24#include "common/vsoc/lib/region_signaling_interface.h"
25#include "common/vsoc/shm/circqueue.h"
26
27// Increases the given index until it is naturally aligned for T.
28template <typename T>
29uintptr_t align(uintptr_t index) {
30 return (index + sizeof(T) - 1) & ~(sizeof(T) - 1);
31}
32
33namespace vsoc {
34class RegionSignalingInterface;
35namespace layout {
36
37template <uint32_t SizeLog2>
38void CircularQueueBase<SizeLog2>::CopyInRange(const char* buffer_in,
39 const Range& t) {
40 size_t bytes = t.end_idx - t.start_idx;
41 uint32_t index = t.start_idx & (BufferSize - 1);
42 if (index + bytes <= BufferSize) {
43 std::memcpy(buffer_ + index, buffer_in, bytes);
44 } else {
45 size_t part1_size = BufferSize - index;
46 size_t part2_size = bytes - part1_size;
47 std::memcpy(buffer_ + index, buffer_in, part1_size);
48 std::memcpy(buffer_, buffer_in + part1_size, part2_size);
49 }
50}
51
52template <uint32_t SizeLog2>
53void CircularQueueBase<SizeLog2>::CopyOutRange(const Range& t,
54 char* buffer_out) {
55 uint32_t index = t.start_idx & (BufferSize - 1);
56 size_t total_size = t.end_idx - t.start_idx;
57 if (index + total_size <= BufferSize) {
58 std::memcpy(buffer_out, buffer_ + index, total_size);
59 } else {
60 uint32_t part1_size = BufferSize - index;
61 uint32_t part2_size = total_size - part1_size;
62 std::memcpy(buffer_out, buffer_ + index, part1_size);
63 std::memcpy(buffer_out + part1_size, buffer_, part2_size);
64 }
65}
66
67template <uint32_t SizeLog2>
68void CircularQueueBase<SizeLog2>::WaitForDataLocked(
69 RegionSignalingInterface* r) {
70 while (1) {
71 uint32_t o_w_pub = w_pub_;
72 // We don't have data. Wait until some appears and try again
73 if (r_released_ != o_w_pub) {
74 return;
75 }
76 lock_.Unlock();
77 r->WaitForSignal(&w_pub_, o_w_pub);
78 lock_.Lock();
79 }
80}
81
82template <uint32_t SizeLog2>
83intptr_t CircularQueueBase<SizeLog2>::WriteReserveLocked(
84 RegionSignalingInterface* r, size_t bytes, Range* t, bool non_blocking) {
85 // Can't write more than the buffer will hold
86 if (bytes > BufferSize) {
87 return -ENOSPC;
88 }
89 while (true) {
90 uint32_t o_w_pub = w_pub_;
91 uint32_t o_r_release = r_released_;
92 uint32_t bytes_in_use = o_w_pub - o_r_release;
93 size_t available = BufferSize - bytes_in_use;
94 if (available >= bytes) {
95 t->start_idx = o_w_pub;
96 t->end_idx = o_w_pub + bytes;
97 break;
98 }
99 if (non_blocking) {
100 return -EWOULDBLOCK;
101 }
102 // If we can't write at the moment wait for a reader to release
103 // some bytes.
104 lock_.Unlock();
105 r->WaitForSignal(&r_released_, o_r_release);
106 lock_.Lock();
107 }
108 return t->end_idx - t->start_idx;
109}
110
111template <uint32_t SizeLog2>
112intptr_t CircularByteQueue<SizeLog2>::Read(RegionSignalingInterface* r,
113 char* buffer_out, size_t max_size) {
114 this->lock_.Lock();
115 this->WaitForDataLocked(r);
116 Range t;
117 t.start_idx = this->r_released_;
118 t.end_idx = this->w_pub_;
119 // The lock is still held here...
120 // Trim the range if we got more than the reader wanted
121 if ((t.end_idx - t.start_idx) > max_size) {
122 t.end_idx = t.start_idx + max_size;
123 }
124 this->CopyOutRange(t, buffer_out);
125 this->r_released_ = t.end_idx;
126 this->lock_.Unlock();
127 r->SendSignal(layout::Sides::Both, &this->r_released_);
128 return t.end_idx - t.start_idx;
129}
130
131template <uint32_t SizeLog2>
132intptr_t CircularByteQueue<SizeLog2>::Write(RegionSignalingInterface* r,
133 const char* buffer_in, size_t bytes,
134 bool non_blocking) {
135 Range range;
136 this->lock_.Lock();
137 intptr_t rval = this->WriteReserveLocked(r, bytes, &range, non_blocking);
138 if (rval < 0) {
139 this->lock_.Unlock();
140 return rval;
141 }
142 this->CopyInRange(buffer_in, range);
143 // We can't publish until all of the previous write allocations where
144 // published.
145 this->w_pub_ = range.end_idx;
146 this->lock_.Unlock();
147 r->SendSignal(layout::Sides::Both, &this->w_pub_);
148 return bytes;
149}
150
151template <uint32_t SizeLog2, uint32_t MaxPacketSize>
152intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::CalculateBufferedSize(
153 size_t payload) {
154 return align<uint32_t>(sizeof(uint32_t) + payload);
155}
156
157template <uint32_t SizeLog2, uint32_t MaxPacketSize>
158intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Read(
159 RegionSignalingInterface* r, char* buffer_out, size_t max_size) {
160 this->lock_.Lock();
161 this->WaitForDataLocked(r);
162 uint32_t packet_size = *reinterpret_cast<uint32_t*>(
163 this->buffer_ + (this->r_released_ & (this->BufferSize - 1)));
164 if (packet_size > max_size) {
165 this->lock_.Unlock();
166 return -ENOSPC;
167 }
168 Range t;
169 t.start_idx = this->r_released_ + sizeof(uint32_t);
170 t.end_idx = t.start_idx + packet_size;
171 this->CopyOutRange(t, buffer_out);
172 this->r_released_ += this->CalculateBufferedSize(packet_size);
173 this->lock_.Unlock();
174 r->SendSignal(layout::Sides::Both, &this->r_released_);
175 return packet_size;
176}
177
178template <uint32_t SizeLog2, uint32_t MaxPacketSize>
179intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Write(
180 RegionSignalingInterface* r, const char* buffer_in, uint32_t bytes,
181 bool non_blocking) {
182 iovec iov;
183 iov.iov_base = const_cast<char *>(buffer_in);
184 iov.iov_len = bytes;
185 return Writev(r, &iov, 1 /* iov_count */, non_blocking);
186}
187
188template <uint32_t SizeLog2, uint32_t MaxPacketSize>
189intptr_t CircularPacketQueue<SizeLog2, MaxPacketSize>::Writev(
190 RegionSignalingInterface *r,
191 const iovec *iov,
192 size_t iov_count,
193 bool non_blocking) {
194 size_t bytes = 0;
195 for (size_t i = 0; i < iov_count; ++i) {
196 bytes += iov[i].iov_len;
197 }
198
199 if (bytes > MaxPacketSize) {
200 return -ENOSPC;
201 }
202
203 Range range;
204 size_t buffered_size = this->CalculateBufferedSize(bytes);
205 this->lock_.Lock();
206 intptr_t rval =
207 this->WriteReserveLocked(r, buffered_size, &range, non_blocking);
208 if (rval < 0) {
209 this->lock_.Unlock();
210 return rval;
211 }
212 Range header = range;
213 header.end_idx = header.start_idx + sizeof(uint32_t);
214 Range payload{
215 static_cast<uint32_t>(range.start_idx + sizeof(uint32_t)),
216 static_cast<uint32_t>(range.start_idx + sizeof(uint32_t) + bytes)};
217 this->CopyInRange(reinterpret_cast<const char*>(&bytes), header);
218
219 Range subRange = payload;
220 for (size_t i = 0; i < iov_count; ++i) {
221 subRange.end_idx = subRange.start_idx + iov[i].iov_len;
222 this->CopyInRange(static_cast<const char *>(iov[i].iov_base), subRange);
223
224 subRange.start_idx = subRange.end_idx;
225 }
226
227 this->w_pub_ = range.end_idx;
228 this->lock_.Unlock();
229 r->SendSignal(layout::Sides::Both, &this->w_pub_);
230 return bytes;
231}
232
233} // namespace layout
234} // namespace vsoc