blob: a081af0197b5d6b78c907d7c173bc7bf73af7f65 [file] [log] [blame]
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08001#include "include/private/dvr/buffer_hub_queue_client.h"
2
Alex Vakulenko4fe60582017-02-02 11:35:59 -08003#include <inttypes.h>
4#include <log/log.h>
Corey Tabaka2b99ee52017-05-04 10:56:05 -07005#include <poll.h>
Hendrik Wagenaar4d3590f2017-05-06 22:36:04 -07006#include <sys/epoll.h>
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08007
8#include <array>
9
10#include <pdx/default_transport/client_channel.h>
11#include <pdx/default_transport/client_channel_factory.h>
12#include <pdx/file_handle.h>
13#include <private/dvr/bufferhub_rpc.h>
14
Corey Tabaka2b99ee52017-05-04 10:56:05 -070015#define RETRY_EINTR(fnc_call) \
16 ([&]() -> decltype(fnc_call) { \
17 decltype(fnc_call) result; \
18 do { \
19 result = (fnc_call); \
20 } while (result == -1 && errno == EINTR); \
21 return result; \
22 })()
23
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070024using android::pdx::ErrorStatus;
25using android::pdx::LocalChannelHandle;
26using android::pdx::Status;
27
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080028namespace android {
29namespace dvr {
30
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070031BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080032 : Client{pdx::default_transport::ClientChannel::Create(
33 std::move(channel_handle))},
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070034 meta_size_(0),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080035 buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -080036 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080037 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -070038 fences_(BufferHubQueue::kMaxQueueCapacity),
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070039 capacity_(0),
40 id_(-1) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080041 Initialize();
42}
43
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070044BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080045 : Client{pdx::default_transport::ClientChannelFactory::Create(
46 endpoint_path)},
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070047 meta_size_(0),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080048 buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -080049 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080050 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -070051 fences_(BufferHubQueue::kMaxQueueCapacity),
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070052 capacity_(0),
53 id_(-1) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080054 Initialize();
55}
56
57void BufferHubQueue::Initialize() {
58 int ret = epoll_fd_.Create();
59 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -080060 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
61 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080062 return;
63 }
64
65 epoll_event event = {.events = EPOLLIN | EPOLLET,
66 .data = {.u64 = static_cast<uint64_t>(
67 BufferHubQueue::kEpollQueueEventIndex)}};
68 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
69 if (ret < 0) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070070 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
Alex Vakulenko4fe60582017-02-02 11:35:59 -080071 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080072 }
73}
74
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070075Status<void> BufferHubQueue::ImportQueue() {
76 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080077 if (!status) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070078 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
79 status.GetErrorMessage().c_str());
80 return ErrorStatus(status.error());
81 } else {
82 SetupQueue(status.get().meta_size_bytes, status.get().id);
83 return {};
84 }
85}
86
87void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
88 meta_size_ = meta_size_bytes;
89 id_ = id;
90 meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
91}
92
93std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
94 if (auto status = CreateConsumerQueueHandle())
95 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
96 else
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080097 return nullptr;
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070098}
99
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700100std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
101 if (auto status = CreateConsumerQueueHandle())
102 return std::unique_ptr<ConsumerQueue>(
103 new ConsumerQueue(status.take(), true));
104 else
105 return nullptr;
106}
107
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700108Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
109 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
110 if (!status) {
111 ALOGE(
112 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
113 "%s",
114 status.GetErrorMessage().c_str());
115 return ErrorStatus(status.error());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800116 }
117
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700118 return status;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800119}
120
121bool BufferHubQueue::WaitForBuffers(int timeout) {
122 std::array<epoll_event, kMaxEvents> events;
123
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700124 // Loop at least once to check for hangups.
125 do {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700126 ALOGD_IF(
127 TRACE,
128 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
129 id(), count(), capacity());
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700130
131 // If there is already a buffer then just check for hangup without waiting.
132 const int ret = epoll_fd_.Wait(events.data(), events.size(),
133 count() == 0 ? timeout : 0);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800134
135 if (ret == 0) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700136 ALOGI_IF(TRACE,
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700137 "BufferHubQueue::WaitForBuffers: No events before timeout: "
138 "queue_id=%d",
139 id());
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700140 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800141 }
142
143 if (ret < 0 && ret != -EINTR) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700144 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
145 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800146 return false;
147 }
148
149 const int num_events = ret;
150
151 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
152 // one for each buffer, in the queue and one extra event for the queue
153 // client itself.
154 for (int i = 0; i < num_events; i++) {
155 int64_t index = static_cast<int64_t>(events[i].data.u64);
156
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700157 ALOGD_IF(TRACE,
158 "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
159 index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800160
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800161 if (is_buffer_event_index(index)) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700162 HandleBufferEvent(static_cast<size_t>(index), events[i].events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800163 } else if (is_queue_event_index(index)) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700164 HandleQueueEvent(events[i].events);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800165 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700166 ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
167 index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800168 }
169 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700170 } while (count() == 0 && capacity() > 0 && !hung_up());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800171
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700172 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800173}
174
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700175void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800176 auto buffer = buffers_[slot];
177 if (!buffer) {
178 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
179 return;
180 }
181
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700182 auto status = buffer->GetEventMask(poll_events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800183 if (!status) {
184 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
185 status.GetErrorMessage().c_str());
186 return;
187 }
188
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700189 const int events = status.get();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800190 if (events & EPOLLIN) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700191 const int ret = OnBufferReady(buffer, &fences_[slot]);
192 if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
193 // Only enqueue the buffer if it moves to or is already in the state
194 // requested in OnBufferReady(). If the buffer is busy this means that the
195 // buffer moved from released to posted when a new consumer was created
196 // before the ProducerQueue had a chance to regain it. This is a valid
197 // transition that we have to handle because edge triggered poll events
198 // latch the ready state even if it is later de-asserted -- don't enqueue
199 // or print an error log in this case.
200 if (ret != -EBUSY)
201 Enqueue(buffer, slot);
202 } else {
203 ALOGE(
204 "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
205 "queue_id=%d buffer_id=%d: %s",
206 id(), buffer->id(), strerror(-ret));
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800207 }
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800208 } else if (events & EPOLLHUP) {
209 // This might be caused by producer replacing an existing buffer slot, or
210 // when BufferHubQueue is shutting down. For the first case, currently the
211 // epoll FD is cleaned up when the replacement consumer client is imported,
212 // we shouldn't detach again if |epollhub_pending_[slot]| is set.
213 ALOGW(
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700214 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
215 "buffer event fd: %d, EPOLLHUP pending: %d",
Alex Vakulenkoa1336cf2017-03-31 08:29:28 -0700216 slot, buffer->event_fd(), int{epollhup_pending_[slot]});
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800217 if (epollhup_pending_[slot]) {
218 epollhup_pending_[slot] = false;
219 } else {
220 DetachBuffer(slot);
221 }
222 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700223 ALOGW(
224 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
225 "events=%d",
226 slot, events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800227 }
228}
229
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700230void BufferHubQueue::HandleQueueEvent(int poll_event) {
231 auto status = GetEventMask(poll_event);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800232 if (!status) {
233 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
234 status.GetErrorMessage().c_str());
235 return;
236 }
237
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700238 const int events = status.get();
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800239 if (events & EPOLLIN) {
240 // Note that after buffer imports, if |count()| still returns 0, epoll
241 // wait will be tried again to acquire the newly imported buffer.
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700242 auto buffer_status = OnBufferAllocated();
243 if (!buffer_status) {
244 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
245 buffer_status.GetErrorMessage().c_str());
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800246 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700247 } else if (events & EPOLLHUP) {
248 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
249 hung_up_ = true;
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800250 } else {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700251 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800252 }
253}
254
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800255int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
256 size_t slot) {
257 if (is_full()) {
258 // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
259 // import buffer.
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800260 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
261 capacity_);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800262 return -E2BIG;
263 }
264
265 if (buffers_[slot] != nullptr) {
266 // Replace the buffer if the slot is preoccupied. This could happen when the
267 // producer side replaced the slot with a newly allocated buffer. Detach the
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800268 // buffer before setting up with the new one.
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800269 DetachBuffer(slot);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800270 epollhup_pending_[slot] = true;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800271 }
272
273 epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
274 const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
275 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800276 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
277 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800278 return ret;
279 }
280
281 buffers_[slot] = buf;
282 capacity_++;
283 return 0;
284}
285
286int BufferHubQueue::DetachBuffer(size_t slot) {
287 auto& buf = buffers_[slot];
288 if (buf == nullptr) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800289 ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800290 return -EINVAL;
291 }
292
293 const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
294 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800295 ALOGE(
296 "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
297 "%s",
298 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800299 return ret;
300 }
301
302 buffers_[slot] = nullptr;
303 capacity_--;
304 return 0;
305}
306
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700307void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800308 size_t slot) {
309 if (count() == capacity_) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700310 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800311 return;
312 }
313
314 // Set slot buffer back to vector.
315 // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
316 // the limitation of the RingBuffer we are using. Would be better to refactor
317 // that.
318 BufferInfo buffer_info(slot, meta_size_);
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700319 buffer_info.buffer = buf;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800320 // Swap metadata loaded during onBufferReady into vector.
321 std::swap(buffer_info.metadata, meta_buffer_tmp_);
322
323 available_buffers_.Append(std::move(buffer_info));
324}
325
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700326Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
327 int timeout, size_t* slot, void* meta, LocalHandle* fence) {
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700328 ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800329
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700330 if (!WaitForBuffers(timeout))
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700331 return ErrorStatus(ETIMEDOUT);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800332
333 std::shared_ptr<BufferHubBuffer> buf;
334 BufferInfo& buffer_info = available_buffers_.Front();
335
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700336 *fence = std::move(fences_[buffer_info.slot]);
337
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800338 // Report current pos as the output slot.
339 std::swap(buffer_info.slot, *slot);
340 // Swap buffer from vector to be returned later.
341 std::swap(buffer_info.buffer, buf);
342 // Swap metadata from vector into tmp so that we can write out to |meta|.
343 std::swap(buffer_info.metadata, meta_buffer_tmp_);
344
345 available_buffers_.PopFront();
346
347 if (!buf) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700348 ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700349 return ErrorStatus(ENOBUFS);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800350 }
351
352 if (meta) {
353 std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
354 reinterpret_cast<uint8_t*>(meta));
355 }
356
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700357 return {std::move(buf)};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800358}
359
360ProducerQueue::ProducerQueue(size_t meta_size)
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700361 : ProducerQueue(meta_size, 0, 0, 0, 0) {}
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800362
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700363ProducerQueue::ProducerQueue(LocalChannelHandle handle)
364 : BASE(std::move(handle)) {
365 auto status = ImportQueue();
366 if (!status) {
367 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
368 status.GetErrorMessage().c_str());
369 Close(-status.error());
370 }
371}
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800372
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700373ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
374 uint64_t usage_clear_mask,
375 uint64_t usage_deny_set_mask,
376 uint64_t usage_deny_clear_mask)
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700377 : BASE(BufferHubRPC::kClientPath) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800378 auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700379 meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
380 usage_deny_set_mask, usage_deny_clear_mask});
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800381 if (!status) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800382 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
383 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800384 Close(-status.error());
385 return;
386 }
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700387
388 SetupQueue(status.get().meta_size_bytes, status.get().id);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800389}
390
Corey Tabakacd52dd92017-04-07 18:03:57 -0700391int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700392 uint32_t format, uint64_t usage,
Hendrik Wagenaar4d3590f2017-05-06 22:36:04 -0700393 size_t* out_slot) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800394 if (out_slot == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700395 ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800396 return -EINVAL;
397 }
398
399 if (is_full()) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800400 ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
401 capacity());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800402 return -E2BIG;
403 }
404
405 const size_t kBufferCount = 1U;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800406 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
407 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
Hendrik Wagenaar4d3590f2017-05-06 22:36:04 -0700408 width, height, format, usage, kBufferCount);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800409 if (!status) {
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700410 ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
411 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800412 return -status.error();
413 }
414
415 auto buffer_handle_slots = status.take();
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800416 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
417 "BufferHubRPC::ProducerQueueAllocateBuffers should "
418 "return one and only one buffer handle.");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800419
420 // We only allocate one buffer at a time.
421 auto& buffer_handle = buffer_handle_slots[0].first;
422 size_t buffer_slot = buffer_handle_slots[0].second;
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700423 ALOGD_IF(TRACE,
424 "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
425 buffer_handle.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800426
427 *out_slot = buffer_slot;
428 return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
429 buffer_slot);
430}
431
432int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
433 size_t slot) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700434 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
435 id(), buf->id(), slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800436 // For producer buffer, we need to enqueue the newly added buffer
437 // immediately. Producer queue starts with all buffers in available state.
438 const int ret = BufferHubQueue::AddBuffer(buf, slot);
439 if (ret < 0)
440 return ret;
441
442 Enqueue(buf, slot);
443 return 0;
444}
445
446int ProducerQueue::DetachBuffer(size_t slot) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700447 auto status =
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800448 InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
449 if (!status) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700450 ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
451 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800452 return -status.error();
453 }
454
455 return BufferHubQueue::DetachBuffer(slot);
456}
457
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700458Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700459 int timeout, size_t* slot, LocalHandle* release_fence) {
460 if (slot == nullptr || release_fence == nullptr) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700461 ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
462 slot, release_fence);
463 return ErrorStatus(EINVAL);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700464 }
465
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700466 auto buffer_status =
467 BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
468 if (!buffer_status)
469 return buffer_status.error_status();
470
471 return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800472}
473
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700474int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700475 LocalHandle* release_fence) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700476 ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
477 id(), buf->id());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800478 auto buffer = std::static_pointer_cast<BufferProducer>(buf);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700479 return buffer->Gain(release_fence);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800480}
481
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700482ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
483 : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700484 auto status = ImportQueue();
485 if (!status) {
486 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
487 status.GetErrorMessage().c_str());
488 Close(-status.error());
489 }
490
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700491 auto import_status = ImportBuffers();
492 if (import_status) {
493 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
494 import_status.get());
495 } else {
496 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
497 import_status.GetErrorMessage().c_str());
498 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800499}
500
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700501Status<size_t> ConsumerQueue::ImportBuffers() {
502 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800503 if (!status) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700504 ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
505 status.GetErrorMessage().c_str());
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700506 return ErrorStatus(status.error());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800507 }
508
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700509 int ret;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800510 int last_error = 0;
511 int imported_buffers = 0;
512
513 auto buffer_handle_slots = status.take();
514 for (auto& buffer_handle_slot : buffer_handle_slots) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700515 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700516 buffer_handle_slot.first.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800517
518 std::unique_ptr<BufferConsumer> buffer_consumer =
519 BufferConsumer::Import(std::move(buffer_handle_slot.first));
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700520
521 // Setup ignore state before adding buffer to the queue.
522 if (ignore_on_import_) {
523 ALOGD_IF(TRACE,
524 "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
525 "buffer_id=%d",
526 buffer_consumer->id());
527 ret = buffer_consumer->SetIgnore(true);
528 if (ret < 0) {
529 ALOGE(
530 "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
531 "imported buffer buffer_id=%d: %s",
532 buffer_consumer->id(), strerror(-ret));
533 last_error = ret;
534 }
535 }
536
537 ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800538 if (ret < 0) {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700539 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800540 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800541 last_error = ret;
542 continue;
543 } else {
544 imported_buffers++;
545 }
546 }
547
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700548 if (imported_buffers > 0)
549 return {imported_buffers};
550 else
551 return ErrorStatus(-last_error);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800552}
553
554int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
555 size_t slot) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700556 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
557 id(), buf->id(), slot);
558 const int ret = BufferHubQueue::AddBuffer(buf, slot);
559 if (ret < 0)
560 return ret;
561
562 // Check to see if the buffer is already signaled. This is necessary to catch
563 // cases where buffers are already available; epoll edge triggered mode does
564 // not fire until and edge transition when adding new buffers to the epoll
565 // set.
566 const int kTimeoutMs = 0;
567 pollfd pfd{buf->event_fd(), POLLIN, 0};
568 const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
569 if (count < 0) {
570 const int error = errno;
571 ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
572 strerror(errno));
573 return -error;
574 }
575
576 if (count == 1)
577 HandleBufferEvent(slot, pfd.revents);
578
579 return 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800580}
581
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700582Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700583 int timeout, size_t* slot, void* meta, size_t meta_size,
584 LocalHandle* acquire_fence) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800585 if (meta_size != meta_size_) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800586 ALOGE(
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700587 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
588 "does not match metadata size (%zu) for the queue.",
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800589 meta_size, meta_size_);
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700590 return ErrorStatus(EINVAL);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800591 }
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700592
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700593 if (slot == nullptr || acquire_fence == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700594 ALOGE(
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700595 "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700596 "acquire_fence=%p",
597 slot, meta, acquire_fence);
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700598 return ErrorStatus(EINVAL);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700599 }
600
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700601 auto buffer_status =
602 BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
603 if (!buffer_status)
604 return buffer_status.error_status();
605
606 return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800607}
608
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700609int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700610 LocalHandle* acquire_fence) {
Corey Tabaka2b99ee52017-05-04 10:56:05 -0700611 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
612 id(), buf->id());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800613 auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700614 return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800615}
616
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700617Status<void> ConsumerQueue::OnBufferAllocated() {
618 auto status = ImportBuffers();
619 if (!status) {
620 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
621 status.GetErrorMessage().c_str());
622 return ErrorStatus(status.error());
623 } else if (status.get() == 0) {
624 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
625 return ErrorStatus(ENOBUFS);
626 } else {
Corey Tabaka9d8bd092017-04-25 16:47:44 -0700627 ALOGD_IF(TRACE,
628 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
629 status.get());
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700630 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800631 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800632}
633
634} // namespace dvr
635} // namespace android