blob: bba075d5950c67c1b1cb202e96393e10fe4cfd99 [file] [log] [blame]
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08001#include "include/private/dvr/buffer_hub_queue_client.h"
2
Alex Vakulenko4fe60582017-02-02 11:35:59 -08003#include <inttypes.h>
4#include <log/log.h>
Corey Tabaka2b99ee52017-05-04 10:56:05 -07005#include <poll.h>
Hendrik Wagenaar4d3590f2017-05-06 22:36:04 -07006#include <sys/epoll.h>
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08007
8#include <array>
9
10#include <pdx/default_transport/client_channel.h>
11#include <pdx/default_transport/client_channel_factory.h>
12#include <pdx/file_handle.h>
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080013
Corey Tabaka2b99ee52017-05-04 10:56:05 -070014#define RETRY_EINTR(fnc_call) \
15 ([&]() -> decltype(fnc_call) { \
16 decltype(fnc_call) result; \
17 do { \
18 result = (fnc_call); \
19 } while (result == -1 && errno == EINTR); \
20 return result; \
21 })()
22
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070023using android::pdx::ErrorStatus;
24using android::pdx::LocalChannelHandle;
Corey Tabakab7ca5de2017-05-08 18:55:02 -070025using android::pdx::LocalHandle;
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070026using android::pdx::Status;
27
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080028namespace android {
29namespace dvr {
30
Corey Tabakab7ca5de2017-05-08 18:55:02 -070031namespace {
32
33// Polls an fd for the given events.
34Status<int> PollEvents(int fd, short events) {
35 const int kTimeoutMs = 0;
36 pollfd pfd{fd, events, 0};
37 const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
38 if (count < 0) {
39 return ErrorStatus(errno);
40 } else if (count == 0) {
41 return ErrorStatus(ETIMEDOUT);
42 } else {
43 return {pfd.revents};
44 }
45}
46
47// Polls a buffer for the given events, taking care to do the proper
48// translation.
49Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer,
50 short events) {
51 auto poll_status = PollEvents(buffer->event_fd(), events);
52 if (!poll_status)
53 return poll_status;
54
55 return buffer->GetEventMask(poll_status.get());
56}
57
58std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
59 return {static_cast<int32_t>(value >> 32),
60 static_cast<int32_t>(value & ((1ull << 32) - 1))};
61}
62
63uint64_t Stuff(int32_t a, int32_t b) {
64 const uint32_t ua = static_cast<uint32_t>(a);
65 const uint32_t ub = static_cast<uint32_t>(b);
66 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
67}
68
69} // anonymous namespace
70
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070071BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080072 : Client{pdx::default_transport::ClientChannel::Create(
Corey Tabakab7ca5de2017-05-08 18:55:02 -070073 std::move(channel_handle))} {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080074 Initialize();
75}
76
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070077BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
Corey Tabakab7ca5de2017-05-08 18:55:02 -070078 : Client{
79 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080080 Initialize();
81}
82
83void BufferHubQueue::Initialize() {
84 int ret = epoll_fd_.Create();
85 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -080086 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
87 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080088 return;
89 }
90
Corey Tabakab7ca5de2017-05-08 18:55:02 -070091 epoll_event event = {
92 .events = EPOLLIN | EPOLLET,
93 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080094 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
95 if (ret < 0) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070096 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
Alex Vakulenko4fe60582017-02-02 11:35:59 -080097 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080098 }
99}
100
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700101Status<void> BufferHubQueue::ImportQueue() {
102 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800103 if (!status) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700104 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
105 status.GetErrorMessage().c_str());
106 return ErrorStatus(status.error());
107 } else {
Jiwen 'Steve' Cai6bffc672017-05-18 23:05:05 -0700108 SetupQueue(status.get());
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700109 return {};
110 }
111}
112
Jiwen 'Steve' Cai6bffc672017-05-18 23:05:05 -0700113void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
114 is_async_ = queue_info.producer_config.is_async;
115 default_width_ = queue_info.producer_config.default_width;
116 default_height_ = queue_info.producer_config.default_height;
117 default_format_ = queue_info.producer_config.default_format;
118 meta_size_ = queue_info.producer_config.meta_size_bytes;
119 id_ = queue_info.id;
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700120}
121
122std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
123 if (auto status = CreateConsumerQueueHandle())
124 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
125 else
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800126 return nullptr;
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700127}
128
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700129std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
130 if (auto status = CreateConsumerQueueHandle())
131 return std::unique_ptr<ConsumerQueue>(
132 new ConsumerQueue(status.take(), true));
133 else
134 return nullptr;
135}
136
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700137Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
138 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
139 if (!status) {
140 ALOGE(
141 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
142 "%s",
143 status.GetErrorMessage().c_str());
144 return ErrorStatus(status.error());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800145 }
146
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700147 return status;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800148}
149
150bool BufferHubQueue::WaitForBuffers(int timeout) {
151 std::array<epoll_event, kMaxEvents> events;
152
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700153 // Loop at least once to check for hangups.
154 do {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700155 ALOGD_IF(
156 TRACE,
157 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
158 id(), count(), capacity());
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700159
160 // If there is already a buffer then just check for hangup without waiting.
161 const int ret = epoll_fd_.Wait(events.data(), events.size(),
162 count() == 0 ? timeout : 0);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800163
164 if (ret == 0) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700165 ALOGI_IF(TRACE,
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700166 "BufferHubQueue::WaitForBuffers: No events before timeout: "
167 "queue_id=%d",
168 id());
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700169 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800170 }
171
172 if (ret < 0 && ret != -EINTR) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700173 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
174 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800175 return false;
176 }
177
178 const int num_events = ret;
179
180 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
181 // one for each buffer, in the queue and one extra event for the queue
182 // client itself.
183 for (int i = 0; i < num_events; i++) {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700184 int32_t event_fd;
185 int32_t index;
186 std::tie(event_fd, index) = Unstuff(events[i].data.u64);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800187
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700188 ALOGD_IF(TRACE,
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700189 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
190 i, event_fd, index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800191
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800192 if (is_buffer_event_index(index)) {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700193 HandleBufferEvent(static_cast<size_t>(index), event_fd,
194 events[i].events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800195 } else if (is_queue_event_index(index)) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700196 HandleQueueEvent(events[i].events);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800197 } else {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700198 ALOGW(
199 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
200 "index=%d",
201 event_fd, index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800202 }
203 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700204 } while (count() == 0 && capacity() > 0 && !hung_up());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800205
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700206 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800207}
208
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700209Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
210 int poll_events) {
211 if (!buffers_[slot]) {
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800212 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700213 return ErrorStatus(ENOENT);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800214 }
215
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700216 auto status = buffers_[slot]->GetEventMask(poll_events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800217 if (!status) {
218 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
219 status.GetErrorMessage().c_str());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700220 return status.error_status();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800221 }
222
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700223 const int events = status.get();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800224 if (events & EPOLLIN) {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700225 auto entry_status = OnBufferReady(buffers_[slot], slot);
226 if (entry_status.ok() || entry_status.error() == EALREADY) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700227 // Only enqueue the buffer if it moves to or is already in the state
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700228 // requested in OnBufferReady().
229 return Enqueue(entry_status.take());
230 } else if (entry_status.error() == EBUSY) {
231 // If the buffer is busy this means that the buffer moved from released to
232 // posted when a new consumer was created before the ProducerQueue had a
233 // chance to regain it. This is a valid transition that we have to handle
234 // because edge triggered poll events latch the ready state even if it is
235 // later de-asserted -- don't enqueue or print an error log in this case.
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700236 } else {
237 ALOGE(
238 "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
239 "queue_id=%d buffer_id=%d: %s",
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700240 id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str());
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800241 }
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800242 } else if (events & EPOLLHUP) {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700243 // Check to see if the current buffer in the slot hung up. This is a bit of
244 // paranoia to deal with the epoll set getting out of sync with the buffer
245 // slots.
246 auto poll_status = PollEvents(buffers_[slot], POLLIN);
247 if (!poll_status && poll_status.error() != ETIMEDOUT) {
248 ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s",
249 poll_status.GetErrorMessage().c_str());
250 return poll_status.error_status();
251 }
252
253 const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP);
254
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800255 ALOGW(
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700256 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
257 "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x",
258 slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending,
259 poll_status.get());
260
261 if (hangup_pending) {
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700262 return RemoveBuffer(slot);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800263 } else {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700264 // Clean up the bookkeeping for the event fd. This is a bit of paranoia to
265 // deal with the epoll set getting out of sync with the buffer slots.
266 // Hitting this path should be very unusual.
267 const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr);
268 if (ret < 0) {
269 ALOGE(
270 "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from "
271 "epoll set: %s",
272 event_fd, strerror(-ret));
273 return ErrorStatus(-ret);
274 }
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800275 }
276 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700277 ALOGW(
278 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
279 "events=%d",
280 slot, events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800281 }
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700282
283 return {};
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800284}
285
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700286Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700287 auto status = GetEventMask(poll_event);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800288 if (!status) {
289 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
290 status.GetErrorMessage().c_str());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700291 return status.error_status();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800292 }
293
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700294 const int events = status.get();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800295 if (events & EPOLLIN) {
296 // Note that after buffer imports, if |count()| still returns 0, epoll
297 // wait will be tried again to acquire the newly imported buffer.
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700298 auto buffer_status = OnBufferAllocated();
299 if (!buffer_status) {
300 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
301 buffer_status.GetErrorMessage().c_str());
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800302 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700303 } else if (events & EPOLLHUP) {
304 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
305 hung_up_ = true;
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800306 } else {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700307 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800308 }
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700309
310 return {};
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800311}
312
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700313Status<void> BufferHubQueue::AddBuffer(
314 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
315 ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
316 buffer->id(), slot);
317
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800318 if (is_full()) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800319 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
320 capacity_);
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700321 return ErrorStatus(E2BIG);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800322 }
323
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700324 if (buffers_[slot]) {
325 // Replace the buffer if the slot is occupied. This could happen when the
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700326 // producer side replaced the slot with a newly allocated buffer. Remove the
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800327 // buffer before setting up with the new one.
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700328 auto remove_status = RemoveBuffer(slot);
329 if (!remove_status)
330 return remove_status.error_status();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800331 }
332
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700333 epoll_event event = {.events = EPOLLIN | EPOLLET,
334 .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
335 const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800336 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800337 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
338 strerror(-ret));
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700339 return ErrorStatus(-ret);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800340 }
341
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700342 buffers_[slot] = buffer;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800343 capacity_++;
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700344 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800345}
346
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700347Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
348 ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700349
350 if (buffers_[slot]) {
351 const int ret =
352 epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr);
353 if (ret < 0) {
354 ALOGE(
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700355 "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700356 "set: "
357 "%s",
358 strerror(-ret));
359 return ErrorStatus(-ret);
360 }
361
362 buffers_[slot] = nullptr;
363 capacity_--;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800364 }
365
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700366 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800367}
368
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700369Status<void> BufferHubQueue::Enqueue(Entry entry) {
370 if (!is_full()) {
371 available_buffers_.Append(std::move(entry));
372 return {};
373 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700374 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700375 return ErrorStatus(E2BIG);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800376 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800377}
378
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700379Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
380 int timeout, size_t* slot, void* meta, LocalHandle* fence) {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700381 ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
382 timeout);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800383
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700384 if (!WaitForBuffers(timeout))
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700385 return ErrorStatus(ETIMEDOUT);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800386
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700387 auto& entry = available_buffers_.Front();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800388
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700389 std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
390 *slot = entry.slot;
391 *fence = std::move(entry.fence);
392 if (meta && entry.metadata) {
393 std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_,
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800394 reinterpret_cast<uint8_t*>(meta));
395 }
396
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700397 available_buffers_.PopFront();
398
399 return {std::move(buffer)};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800400}
401
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700402ProducerQueue::ProducerQueue(LocalChannelHandle handle)
403 : BASE(std::move(handle)) {
404 auto status = ImportQueue();
405 if (!status) {
406 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
407 status.GetErrorMessage().c_str());
408 Close(-status.error());
409 }
410}
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800411
Jiwen 'Steve' Cai6bffc672017-05-18 23:05:05 -0700412ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
413 const UsagePolicy& usage)
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700414 : BASE(BufferHubRPC::kClientPath) {
Jiwen 'Steve' Caicbd32bf2017-05-18 17:03:20 -0700415 auto status =
Jiwen 'Steve' Cai6bffc672017-05-18 23:05:05 -0700416 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800417 if (!status) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800418 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
419 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800420 Close(-status.error());
421 return;
422 }
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700423
Jiwen 'Steve' Cai6bffc672017-05-18 23:05:05 -0700424 SetupQueue(status.get());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800425}
426
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700427Status<void> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
428 uint32_t layer_count,
429 uint32_t format, uint64_t usage,
430 size_t* out_slot) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800431 if (out_slot == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700432 ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700433 return ErrorStatus(EINVAL);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800434 }
435
436 if (is_full()) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800437 ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
438 capacity());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700439 return ErrorStatus(E2BIG);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800440 }
441
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700442 const size_t kBufferCount = 1u;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800443 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
444 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
Hendrik Wagenaar108e84f2017-05-07 22:19:17 -0700445 width, height, layer_count, format, usage, kBufferCount);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800446 if (!status) {
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700447 ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
448 status.GetErrorMessage().c_str());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700449 return status.error_status();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800450 }
451
452 auto buffer_handle_slots = status.take();
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800453 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
454 "BufferHubRPC::ProducerQueueAllocateBuffers should "
455 "return one and only one buffer handle.");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800456
457 // We only allocate one buffer at a time.
458 auto& buffer_handle = buffer_handle_slots[0].first;
459 size_t buffer_slot = buffer_handle_slots[0].second;
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700460 ALOGD_IF(TRACE,
461 "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
462 buffer_handle.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800463
464 *out_slot = buffer_slot;
465 return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
466 buffer_slot);
467}
468
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700469Status<void> ProducerQueue::AddBuffer(
470 const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700471 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700472 id(), buffer->id(), slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800473 // For producer buffer, we need to enqueue the newly added buffer
474 // immediately. Producer queue starts with all buffers in available state.
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700475 auto status = BufferHubQueue::AddBuffer(buffer, slot);
476 if (!status)
477 return status;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800478
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700479 return Enqueue(buffer, slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800480}
481
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700482Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700483 auto status =
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700484 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800485 if (!status) {
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700486 ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700487 status.GetErrorMessage().c_str());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700488 return status.error_status();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800489 }
490
Jiwen 'Steve' Caibb701db2017-05-23 11:15:33 -0700491 return BufferHubQueue::RemoveBuffer(slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800492}
493
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700494Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700495 int timeout, size_t* slot, LocalHandle* release_fence) {
496 if (slot == nullptr || release_fence == nullptr) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700497 ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
498 slot, release_fence);
499 return ErrorStatus(EINVAL);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700500 }
501
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700502 auto buffer_status =
503 BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
504 if (!buffer_status)
505 return buffer_status.error_status();
506
507 return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800508}
509
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700510Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady(
511 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
512 ALOGD_IF(TRACE,
513 "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
514 id(), buffer->id(), slot);
515
516 // Avoid taking a transient reference, buffer is valid for the duration of
517 // this method call.
518 auto* producer_buffer = static_cast<BufferProducer*>(buffer.get());
519 LocalHandle release_fence;
520
521 const int ret = producer_buffer->Gain(&release_fence);
522 if (ret < 0)
523 return ErrorStatus(-ret);
524 else
525 return {{buffer, nullptr, std::move(release_fence), slot}};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800526}
527
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700528ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
529 : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700530 auto status = ImportQueue();
531 if (!status) {
532 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
533 status.GetErrorMessage().c_str());
534 Close(-status.error());
535 }
536
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700537 auto import_status = ImportBuffers();
538 if (import_status) {
539 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
540 import_status.get());
541 } else {
542 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
543 import_status.GetErrorMessage().c_str());
544 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800545}
546
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700547Status<size_t> ConsumerQueue::ImportBuffers() {
548 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800549 if (!status) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700550 ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
551 status.GetErrorMessage().c_str());
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700552 return status.error_status();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800553 }
554
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700555 int ret;
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700556 Status<void> last_error;
557 size_t imported_buffers_count = 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800558
559 auto buffer_handle_slots = status.take();
560 for (auto& buffer_handle_slot : buffer_handle_slots) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700561 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700562 buffer_handle_slot.first.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800563
564 std::unique_ptr<BufferConsumer> buffer_consumer =
565 BufferConsumer::Import(std::move(buffer_handle_slot.first));
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700566
567 // Setup ignore state before adding buffer to the queue.
568 if (ignore_on_import_) {
569 ALOGD_IF(TRACE,
570 "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
571 "buffer_id=%d",
572 buffer_consumer->id());
573 ret = buffer_consumer->SetIgnore(true);
574 if (ret < 0) {
575 ALOGE(
576 "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
577 "imported buffer buffer_id=%d: %s",
578 buffer_consumer->id(), strerror(-ret));
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700579 last_error = ErrorStatus(-ret);
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700580 }
581 }
582
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700583 auto add_status =
584 AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
585 if (!add_status) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700586 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700587 add_status.GetErrorMessage().c_str());
588 last_error = add_status;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800589 } else {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700590 imported_buffers_count++;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800591 }
592 }
593
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700594 if (imported_buffers_count > 0)
595 return {imported_buffers_count};
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700596 else
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700597 return last_error.error_status();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800598}
599
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700600Status<void> ConsumerQueue::AddBuffer(
601 const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700602 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700603 id(), buffer->id(), slot);
604 auto status = BufferHubQueue::AddBuffer(buffer, slot);
605 if (!status)
606 return status;
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700607
608 // Check to see if the buffer is already signaled. This is necessary to catch
609 // cases where buffers are already available; epoll edge triggered mode does
610 // not fire until and edge transition when adding new buffers to the epoll
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700611 // set. Note that we only poll the fd events because HandleBufferEvent() takes
612 // care of checking the translated buffer events.
613 auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
614 if (!poll_status && poll_status.error() != ETIMEDOUT) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700615 ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700616 poll_status.GetErrorMessage().c_str());
617 return poll_status.error_status();
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700618 }
619
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700620 // Update accounting if the buffer is available.
621 if (poll_status)
622 return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
623 else
624 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800625}
626
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700627Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700628 int timeout, size_t* slot, void* meta, size_t meta_size,
629 LocalHandle* acquire_fence) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800630 if (meta_size != meta_size_) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800631 ALOGE(
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700632 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
633 "does not match metadata size (%zu) for the queue.",
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800634 meta_size, meta_size_);
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700635 return ErrorStatus(EINVAL);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800636 }
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700637
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700638 if (slot == nullptr || acquire_fence == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700639 ALOGE(
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700640 "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700641 "acquire_fence=%p",
642 slot, meta, acquire_fence);
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700643 return ErrorStatus(EINVAL);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700644 }
645
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700646 auto buffer_status =
647 BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
648 if (!buffer_status)
649 return buffer_status.error_status();
650
651 return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800652}
653
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700654Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady(
655 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
656 ALOGD_IF(TRACE,
657 "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
658 id(), buffer->id(), slot);
659
660 // Avoid taking a transient reference, buffer is valid for the duration of
661 // this method call.
662 auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get());
663 std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_]
664 : nullptr);
665 LocalHandle acquire_fence;
666
667 const int ret =
668 consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_);
669 if (ret < 0)
670 return ErrorStatus(-ret);
671 else
672 return {{buffer, std::move(metadata), std::move(acquire_fence), slot}};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800673}
674
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700675Status<void> ConsumerQueue::OnBufferAllocated() {
Corey Tabakab7ca5de2017-05-08 18:55:02 -0700676 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
677
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700678 auto status = ImportBuffers();
679 if (!status) {
680 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
681 status.GetErrorMessage().c_str());
682 return ErrorStatus(status.error());
683 } else if (status.get() == 0) {
684 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
685 return ErrorStatus(ENOBUFS);
686 } else {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700687 ALOGD_IF(TRACE,
688 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
689 status.get());
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700690 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800691 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800692}
693
694} // namespace dvr
695} // namespace android