blob: 433db426afcc653f363533dcdf215a90e8e4236c [file] [log] [blame]
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08001#include "include/private/dvr/buffer_hub_queue_client.h"
2
Alex Vakulenko4fe60582017-02-02 11:35:59 -08003#include <inttypes.h>
4#include <log/log.h>
Alex Vakulenkoe4eec202017-01-27 14:41:04 -08005#include <sys/epoll.h>
6
7#include <array>
8
9#include <pdx/default_transport/client_channel.h>
10#include <pdx/default_transport/client_channel_factory.h>
11#include <pdx/file_handle.h>
12#include <private/dvr/bufferhub_rpc.h>
13
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070014using android::pdx::ErrorStatus;
15using android::pdx::LocalChannelHandle;
16using android::pdx::Status;
17
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080018namespace android {
19namespace dvr {
20
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070021BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080022 : Client{pdx::default_transport::ClientChannel::Create(
23 std::move(channel_handle))},
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070024 meta_size_(0),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080025 buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -080026 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080027 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -070028 fences_(BufferHubQueue::kMaxQueueCapacity),
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070029 capacity_(0),
30 id_(-1) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080031 Initialize();
32}
33
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070034BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080035 : Client{pdx::default_transport::ClientChannelFactory::Create(
36 endpoint_path)},
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070037 meta_size_(0),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080038 buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -080039 epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080040 available_buffers_(BufferHubQueue::kMaxQueueCapacity),
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -070041 fences_(BufferHubQueue::kMaxQueueCapacity),
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070042 capacity_(0),
43 id_(-1) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080044 Initialize();
45}
46
47void BufferHubQueue::Initialize() {
48 int ret = epoll_fd_.Create();
49 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -080050 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
51 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080052 return;
53 }
54
55 epoll_event event = {.events = EPOLLIN | EPOLLET,
56 .data = {.u64 = static_cast<uint64_t>(
57 BufferHubQueue::kEpollQueueEventIndex)}};
58 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
59 if (ret < 0) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070060 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
Alex Vakulenko4fe60582017-02-02 11:35:59 -080061 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080062 }
63}
64
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070065Status<void> BufferHubQueue::ImportQueue() {
66 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080067 if (!status) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070068 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
69 status.GetErrorMessage().c_str());
70 return ErrorStatus(status.error());
71 } else {
72 SetupQueue(status.get().meta_size_bytes, status.get().id);
73 return {};
74 }
75}
76
77void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
78 meta_size_ = meta_size_bytes;
79 id_ = id;
80 meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
81}
82
83std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
84 if (auto status = CreateConsumerQueueHandle())
85 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
86 else
Alex Vakulenkoe4eec202017-01-27 14:41:04 -080087 return nullptr;
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070088}
89
Corey Tabaka8a4e6a92017-04-20 13:42:02 -070090std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
91 if (auto status = CreateConsumerQueueHandle())
92 return std::unique_ptr<ConsumerQueue>(
93 new ConsumerQueue(status.take(), true));
94 else
95 return nullptr;
96}
97
Corey Tabaka1db8a5d2017-03-22 02:12:52 -070098Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
99 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
100 if (!status) {
101 ALOGE(
102 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
103 "%s",
104 status.GetErrorMessage().c_str());
105 return ErrorStatus(status.error());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800106 }
107
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700108 return status;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800109}
110
111bool BufferHubQueue::WaitForBuffers(int timeout) {
112 std::array<epoll_event, kMaxEvents> events;
113
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700114 // Loop at least once to check for hangups.
115 do {
116 ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
117 count(), capacity());
118
119 // If there is already a buffer then just check for hangup without waiting.
120 const int ret = epoll_fd_.Wait(events.data(), events.size(),
121 count() == 0 ? timeout : 0);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800122
123 if (ret == 0) {
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700124 ALOGD_IF(TRACE, "Wait on epoll returns nothing before timeout.");
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700125 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800126 }
127
128 if (ret < 0 && ret != -EINTR) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700129 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
130 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800131 return false;
132 }
133
134 const int num_events = ret;
135
136 // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
137 // one for each buffer, in the queue and one extra event for the queue
138 // client itself.
139 for (int i = 0; i < num_events; i++) {
140 int64_t index = static_cast<int64_t>(events[i].data.u64);
141
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700142 ALOGD_IF(TRACE, "New BufferHubQueue event %d: index=%" PRId64, i, index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800143
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800144 if (is_buffer_event_index(index)) {
145 HandleBufferEvent(static_cast<size_t>(index), events[i]);
146 } else if (is_queue_event_index(index)) {
147 HandleQueueEvent(events[i]);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800148 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700149 ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
150 index);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800151 }
152 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700153 } while (count() == 0 && capacity() > 0 && !hung_up());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800154
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700155 return count() != 0;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800156}
157
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800158void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
159 auto buffer = buffers_[slot];
160 if (!buffer) {
161 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
162 return;
163 }
164
165 auto status = buffer->GetEventMask(event.events);
166 if (!status) {
167 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
168 status.GetErrorMessage().c_str());
169 return;
170 }
171
172 int events = status.get();
173 if (events & EPOLLIN) {
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700174 int ret = OnBufferReady(buffer, &fences_[slot]);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800175 if (ret < 0) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700176 ALOGE("BufferHubQueue::HandleBufferEvent: Failed to set buffer ready: %s",
177 strerror(-ret));
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800178 return;
179 }
180 Enqueue(buffer, slot);
181 } else if (events & EPOLLHUP) {
182 // This might be caused by producer replacing an existing buffer slot, or
183 // when BufferHubQueue is shutting down. For the first case, currently the
184 // epoll FD is cleaned up when the replacement consumer client is imported,
185 // we shouldn't detach again if |epollhub_pending_[slot]| is set.
186 ALOGW(
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700187 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
188 "buffer event fd: %d, EPOLLHUP pending: %d",
Alex Vakulenkoa1336cf2017-03-31 08:29:28 -0700189 slot, buffer->event_fd(), int{epollhup_pending_[slot]});
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800190 if (epollhup_pending_[slot]) {
191 epollhup_pending_[slot] = false;
192 } else {
193 DetachBuffer(slot);
194 }
195 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700196 ALOGW(
197 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
198 "events=%d",
199 slot, events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800200 }
201}
202
203void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
204 auto status = GetEventMask(event.events);
205 if (!status) {
206 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
207 status.GetErrorMessage().c_str());
208 return;
209 }
210
211 int events = status.get();
212 if (events & EPOLLIN) {
213 // Note that after buffer imports, if |count()| still returns 0, epoll
214 // wait will be tried again to acquire the newly imported buffer.
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700215 auto buffer_status = OnBufferAllocated();
216 if (!buffer_status) {
217 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
218 buffer_status.GetErrorMessage().c_str());
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800219 }
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700220 } else if (events & EPOLLHUP) {
221 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
222 hung_up_ = true;
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800223 } else {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700224 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800225 }
226}
227
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800228int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
229 size_t slot) {
230 if (is_full()) {
231 // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
232 // import buffer.
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800233 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
234 capacity_);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800235 return -E2BIG;
236 }
237
238 if (buffers_[slot] != nullptr) {
239 // Replace the buffer if the slot is preoccupied. This could happen when the
240 // producer side replaced the slot with a newly allocated buffer. Detach the
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800241 // buffer before setting up with the new one.
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800242 DetachBuffer(slot);
Jiwen 'Steve' Caidc14e5b2017-01-24 17:05:12 -0800243 epollhup_pending_[slot] = true;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800244 }
245
246 epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
247 const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
248 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800249 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
250 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800251 return ret;
252 }
253
254 buffers_[slot] = buf;
255 capacity_++;
256 return 0;
257}
258
259int BufferHubQueue::DetachBuffer(size_t slot) {
260 auto& buf = buffers_[slot];
261 if (buf == nullptr) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800262 ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800263 return -EINVAL;
264 }
265
266 const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
267 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800268 ALOGE(
269 "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
270 "%s",
271 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800272 return ret;
273 }
274
275 buffers_[slot] = nullptr;
276 capacity_--;
277 return 0;
278}
279
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700280void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800281 size_t slot) {
282 if (count() == capacity_) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700283 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800284 return;
285 }
286
287 // Set slot buffer back to vector.
288 // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
289 // the limitation of the RingBuffer we are using. Would be better to refactor
290 // that.
291 BufferInfo buffer_info(slot, meta_size_);
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700292 buffer_info.buffer = buf;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800293 // Swap metadata loaded during onBufferReady into vector.
294 std::swap(buffer_info.metadata, meta_buffer_tmp_);
295
296 available_buffers_.Append(std::move(buffer_info));
297}
298
299std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
300 size_t* slot,
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700301 void* meta,
302 LocalHandle* fence) {
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700303 ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800304
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700305 if (!WaitForBuffers(timeout))
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800306 return nullptr;
307
308 std::shared_ptr<BufferHubBuffer> buf;
309 BufferInfo& buffer_info = available_buffers_.Front();
310
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700311 *fence = std::move(fences_[buffer_info.slot]);
312
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800313 // Report current pos as the output slot.
314 std::swap(buffer_info.slot, *slot);
315 // Swap buffer from vector to be returned later.
316 std::swap(buffer_info.buffer, buf);
317 // Swap metadata from vector into tmp so that we can write out to |meta|.
318 std::swap(buffer_info.metadata, meta_buffer_tmp_);
319
320 available_buffers_.PopFront();
321
322 if (!buf) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700323 ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800324 return nullptr;
325 }
326
327 if (meta) {
328 std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
329 reinterpret_cast<uint8_t*>(meta));
330 }
331
332 return buf;
333}
334
335ProducerQueue::ProducerQueue(size_t meta_size)
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700336 : ProducerQueue(meta_size, 0, 0, 0, 0) {}
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800337
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700338ProducerQueue::ProducerQueue(LocalChannelHandle handle)
339 : BASE(std::move(handle)) {
340 auto status = ImportQueue();
341 if (!status) {
342 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
343 status.GetErrorMessage().c_str());
344 Close(-status.error());
345 }
346}
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800347
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700348ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
349 uint64_t usage_clear_mask,
350 uint64_t usage_deny_set_mask,
351 uint64_t usage_deny_clear_mask)
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700352 : BASE(BufferHubRPC::kClientPath) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800353 auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700354 meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
355 usage_deny_set_mask, usage_deny_clear_mask});
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800356 if (!status) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800357 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
358 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800359 Close(-status.error());
360 return;
361 }
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700362
363 SetupQueue(status.get().meta_size_bytes, status.get().id);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800364}
365
Corey Tabakacd52dd92017-04-07 18:03:57 -0700366int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700367 uint32_t format, uint64_t usage,
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800368 size_t slice_count, size_t* out_slot) {
369 if (out_slot == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700370 ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800371 return -EINVAL;
372 }
373
374 if (is_full()) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800375 ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
376 capacity());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800377 return -E2BIG;
378 }
379
380 const size_t kBufferCount = 1U;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800381 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
382 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
Jiwen 'Steve' Cai0057fdd2017-05-02 11:21:18 -0700383 width, height, format, usage, slice_count, kBufferCount);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800384 if (!status) {
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700385 ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
386 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800387 return -status.error();
388 }
389
390 auto buffer_handle_slots = status.take();
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800391 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
392 "BufferHubRPC::ProducerQueueAllocateBuffers should "
393 "return one and only one buffer handle.");
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800394
395 // We only allocate one buffer at a time.
396 auto& buffer_handle = buffer_handle_slots[0].first;
397 size_t buffer_slot = buffer_handle_slots[0].second;
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700398 ALOGD_IF(TRACE,
399 "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
400 buffer_handle.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800401
402 *out_slot = buffer_slot;
403 return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
404 buffer_slot);
405}
406
407int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
408 size_t slot) {
409 // For producer buffer, we need to enqueue the newly added buffer
410 // immediately. Producer queue starts with all buffers in available state.
411 const int ret = BufferHubQueue::AddBuffer(buf, slot);
412 if (ret < 0)
413 return ret;
414
415 Enqueue(buf, slot);
416 return 0;
417}
418
419int ProducerQueue::DetachBuffer(size_t slot) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700420 auto status =
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800421 InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
422 if (!status) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800423 ALOGE(
424 "ProducerQueue::DetachBuffer failed to detach producer buffer through "
425 "BufferHub, error: %s",
426 status.GetErrorMessage().c_str());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800427 return -status.error();
428 }
429
430 return BufferHubQueue::DetachBuffer(slot);
431}
432
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700433std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(
434 int timeout, size_t* slot, LocalHandle* release_fence) {
435 if (slot == nullptr || release_fence == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700436 ALOGE(
437 "ProducerQueue::Dequeue: invalid parameter, slot=%p, release_fence=%p",
438 slot, release_fence);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700439 return nullptr;
440 }
441
442 auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800443 return std::static_pointer_cast<BufferProducer>(buf);
444}
445
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700446int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700447 LocalHandle* release_fence) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800448 auto buffer = std::static_pointer_cast<BufferProducer>(buf);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700449 return buffer->Gain(release_fence);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800450}
451
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700452ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
453 : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700454 auto status = ImportQueue();
455 if (!status) {
456 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
457 status.GetErrorMessage().c_str());
458 Close(-status.error());
459 }
460
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700461 auto import_status = ImportBuffers();
462 if (import_status) {
463 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
464 import_status.get());
465 } else {
466 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
467 import_status.GetErrorMessage().c_str());
468 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800469}
470
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700471Status<size_t> ConsumerQueue::ImportBuffers() {
472 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800473 if (!status) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800474 ALOGE(
475 "ConsumerQueue::ImportBuffers failed to import consumer buffer through "
476 "BufferBub, error: %s",
477 status.GetErrorMessage().c_str());
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700478 return ErrorStatus(status.error());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800479 }
480
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700481 int ret;
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800482 int last_error = 0;
483 int imported_buffers = 0;
484
485 auto buffer_handle_slots = status.take();
486 for (auto& buffer_handle_slot : buffer_handle_slots) {
Jiwen 'Steve' Cai25fd3fa2017-03-20 15:30:21 -0700487 ALOGD_IF(TRACE,
488 "ConsumerQueue::ImportBuffers, new buffer, buffer_handle: %d",
489 buffer_handle_slot.first.value());
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800490
491 std::unique_ptr<BufferConsumer> buffer_consumer =
492 BufferConsumer::Import(std::move(buffer_handle_slot.first));
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700493
494 // Setup ignore state before adding buffer to the queue.
495 if (ignore_on_import_) {
496 ALOGD_IF(TRACE,
497 "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
498 "buffer_id=%d",
499 buffer_consumer->id());
500 ret = buffer_consumer->SetIgnore(true);
501 if (ret < 0) {
502 ALOGE(
503 "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
504 "imported buffer buffer_id=%d: %s",
505 buffer_consumer->id(), strerror(-ret));
506 last_error = ret;
507 }
508 }
509
510 ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800511 if (ret < 0) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800512 ALOGE("ConsumerQueue::ImportBuffers failed to add buffer, ret: %s",
513 strerror(-ret));
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800514 last_error = ret;
515 continue;
516 } else {
517 imported_buffers++;
518 }
519 }
520
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700521 if (imported_buffers > 0)
522 return {imported_buffers};
523 else
524 return ErrorStatus(-last_error);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800525}
526
527int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
528 size_t slot) {
529 // Consumer queue starts with all buffers in unavailable state.
530 return BufferHubQueue::AddBuffer(buf, slot);
531}
532
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700533std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
534 int timeout, size_t* slot, void* meta, size_t meta_size,
535 LocalHandle* acquire_fence) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800536 if (meta_size != meta_size_) {
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800537 ALOGE(
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700538 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
539 "does not match metadata size (%zu) for the queue.",
Alex Vakulenko4fe60582017-02-02 11:35:59 -0800540 meta_size, meta_size_);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800541 return nullptr;
542 }
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700543
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700544 if (slot == nullptr || acquire_fence == nullptr) {
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700545 ALOGE(
546 "ConsumerQueue::Dequeue: Invalid parameter, slot=%p, meta=%p, "
547 "acquire_fence=%p",
548 slot, meta, acquire_fence);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700549 return nullptr;
550 }
551
552 auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800553 return std::static_pointer_cast<BufferConsumer>(buf);
554}
555
Corey Tabaka8a4e6a92017-04-20 13:42:02 -0700556int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700557 LocalHandle* acquire_fence) {
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800558 auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
Jiwen 'Steve' Caied654322017-03-13 17:04:43 -0700559 return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800560}
561
Corey Tabaka1db8a5d2017-03-22 02:12:52 -0700562Status<void> ConsumerQueue::OnBufferAllocated() {
563 auto status = ImportBuffers();
564 if (!status) {
565 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
566 status.GetErrorMessage().c_str());
567 return ErrorStatus(status.error());
568 } else if (status.get() == 0) {
569 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
570 return ErrorStatus(ENOBUFS);
571 } else {
572 ALOGD_IF(TRACE, "Imported %zu consumer buffers.", status.get());
573 return {};
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800574 }
Alex Vakulenkoe4eec202017-01-27 14:41:04 -0800575}
576
577} // namespace dvr
578} // namespace android