Clean up BufferHubQueue API and internal bookkeeping.
- Simplify buffer hangup accounting.
- Add extra checks to gracefully handle the epoll set and slots array
being out of sync.
- Add tests for detaching buffers.
- Switch to using Status<T> for all return/error values.
- Fix minor bug in BufferHubQueueProducer from earlier Status<T>
return value change.
Bug: 36401174
Test: buffer_hub_queue-test passes.
Change-Id: If7f86a45cc048dc77daa2ede56585d3f882dd24f
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index 012a4e7..1978f41 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -23,34 +23,61 @@
using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
+using android::pdx::LocalHandle;
using android::pdx::Status;
namespace android {
namespace dvr {
+namespace {
+
+// Polls an fd for the given events.
+Status<int> PollEvents(int fd, short events) {
+ const int kTimeoutMs = 0;
+ pollfd pfd{fd, events, 0};
+ const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
+ if (count < 0) {
+ return ErrorStatus(errno);
+ } else if (count == 0) {
+ return ErrorStatus(ETIMEDOUT);
+ } else {
+ return {pfd.revents};
+ }
+}
+
+// Polls a buffer for the given events, taking care to do the proper
+// translation.
+Status<int> PollEvents(const std::shared_ptr<BufferHubBuffer>& buffer,
+ short events) {
+ auto poll_status = PollEvents(buffer->event_fd(), events);
+ if (!poll_status)
+ return poll_status;
+
+ return buffer->GetEventMask(poll_status.get());
+}
+
+std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
+ return {static_cast<int32_t>(value >> 32),
+ static_cast<int32_t>(value & ((1ull << 32) - 1))};
+}
+
+uint64_t Stuff(int32_t a, int32_t b) {
+ const uint32_t ua = static_cast<uint32_t>(a);
+ const uint32_t ub = static_cast<uint32_t>(b);
+ return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
+}
+
+} // anonymous namespace
+
BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
: Client{pdx::default_transport::ClientChannel::Create(
- std::move(channel_handle))},
- meta_size_(0),
- buffers_(BufferHubQueue::kMaxQueueCapacity),
- epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
- available_buffers_(BufferHubQueue::kMaxQueueCapacity),
- fences_(BufferHubQueue::kMaxQueueCapacity),
- capacity_(0),
- id_(-1) {
+ std::move(channel_handle))} {
Initialize();
}
BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
- : Client{pdx::default_transport::ClientChannelFactory::Create(
- endpoint_path)},
- meta_size_(0),
- buffers_(BufferHubQueue::kMaxQueueCapacity),
- epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
- available_buffers_(BufferHubQueue::kMaxQueueCapacity),
- fences_(BufferHubQueue::kMaxQueueCapacity),
- capacity_(0),
- id_(-1) {
+ : Client{
+ pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
Initialize();
}
@@ -62,9 +89,9 @@
return;
}
- epoll_event event = {.events = EPOLLIN | EPOLLET,
- .data = {.u64 = static_cast<uint64_t>(
- BufferHubQueue::kEpollQueueEventIndex)}};
+ epoll_event event = {
+ .events = EPOLLIN | EPOLLET,
+ .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
if (ret < 0) {
ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
@@ -87,7 +114,6 @@
void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
meta_size_ = meta_size_bytes;
id_ = id;
- meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
@@ -152,19 +178,24 @@
// one for each buffer, in the queue and one extra event for the queue
// client itself.
for (int i = 0; i < num_events; i++) {
- int64_t index = static_cast<int64_t>(events[i].data.u64);
+ int32_t event_fd;
+ int32_t index;
+ std::tie(event_fd, index) = Unstuff(events[i].data.u64);
ALOGD_IF(TRACE,
- "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
- index);
+ "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
+ i, event_fd, index);
if (is_buffer_event_index(index)) {
- HandleBufferEvent(static_cast<size_t>(index), events[i].events);
+ HandleBufferEvent(static_cast<size_t>(index), event_fd,
+ events[i].events);
} else if (is_queue_event_index(index)) {
HandleQueueEvent(events[i].events);
} else {
- ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
- index);
+ ALOGW(
+ "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
+ "index=%d",
+ event_fd, index);
}
}
} while (count() == 0 && capacity() > 0 && !hung_up());
@@ -172,52 +203,72 @@
return count() != 0;
}
-void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
- auto buffer = buffers_[slot];
- if (!buffer) {
+Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
+ int poll_events) {
+ if (!buffers_[slot]) {
ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
- return;
+ return ErrorStatus(ENOENT);
}
- auto status = buffer->GetEventMask(poll_events);
+ auto status = buffers_[slot]->GetEventMask(poll_events);
if (!status) {
ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
- return;
+ return status.error_status();
}
const int events = status.get();
if (events & EPOLLIN) {
- const int ret = OnBufferReady(buffer, &fences_[slot]);
- if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
+ auto entry_status = OnBufferReady(buffers_[slot], slot);
+ if (entry_status.ok() || entry_status.error() == EALREADY) {
// Only enqueue the buffer if it moves to or is already in the state
- // requested in OnBufferReady(). If the buffer is busy this means that the
- // buffer moved from released to posted when a new consumer was created
- // before the ProducerQueue had a chance to regain it. This is a valid
- // transition that we have to handle because edge triggered poll events
- // latch the ready state even if it is later de-asserted -- don't enqueue
- // or print an error log in this case.
- if (ret != -EBUSY)
- Enqueue(buffer, slot);
+ // requested in OnBufferReady().
+ return Enqueue(entry_status.take());
+ } else if (entry_status.error() == EBUSY) {
+ // If the buffer is busy this means that the buffer moved from released to
+ // posted when a new consumer was created before the ProducerQueue had a
+ // chance to regain it. This is a valid transition that we have to handle
+ // because edge triggered poll events latch the ready state even if it is
+ // later de-asserted -- don't enqueue or print an error log in this case.
} else {
ALOGE(
"BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
"queue_id=%d buffer_id=%d: %s",
- id(), buffer->id(), strerror(-ret));
+ id(), buffers_[slot]->id(), entry_status.GetErrorMessage().c_str());
}
} else if (events & EPOLLHUP) {
- // This might be caused by producer replacing an existing buffer slot, or
- // when BufferHubQueue is shutting down. For the first case, currently the
- // epoll FD is cleaned up when the replacement consumer client is imported,
- // we shouldn't detach again if |epollhub_pending_[slot]| is set.
+ // Check to see if the current buffer in the slot hung up. This is a bit of
+ // paranoia to deal with the epoll set getting out of sync with the buffer
+ // slots.
+ auto poll_status = PollEvents(buffers_[slot], POLLIN);
+ if (!poll_status && poll_status.error() != ETIMEDOUT) {
+ ALOGE("BufferHubQueue::HandleBufferEvent: Failed to poll buffer: %s",
+ poll_status.GetErrorMessage().c_str());
+ return poll_status.error_status();
+ }
+
+ const bool hangup_pending = status.ok() && (poll_status.get() & EPOLLHUP);
+
ALOGW(
- "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
- "buffer event fd: %d, EPOLLHUP pending: %d",
- slot, buffer->event_fd(), int{epollhup_pending_[slot]});
- if (epollhup_pending_[slot]) {
- epollhup_pending_[slot] = false;
+ "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
+ "event_fd=%d buffer_id=%d hangup_pending=%d poll_status=%x",
+ slot, buffers_[slot]->event_fd(), buffers_[slot]->id(), hangup_pending,
+ poll_status.get());
+
+ if (hangup_pending) {
+ return DetachBuffer(slot);
} else {
- DetachBuffer(slot);
+ // Clean up the bookkeeping for the event fd. This is a bit of paranoia to
+ // deal with the epoll set getting out of sync with the buffer slots.
+ // Hitting this path should be very unusual.
+ const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_fd, nullptr);
+ if (ret < 0) {
+ ALOGE(
+ "BufferHubQueue::HandleBufferEvent: Failed to remove fd=%d from "
+ "epoll set: %s",
+ event_fd, strerror(-ret));
+ return ErrorStatus(-ret);
+ }
}
} else {
ALOGW(
@@ -225,14 +276,16 @@
"events=%d",
slot, events);
}
+
+ return {};
}
-void BufferHubQueue::HandleQueueEvent(int poll_event) {
+Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
auto status = GetEventMask(poll_event);
if (!status) {
ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
status.GetErrorMessage().c_str());
- return;
+ return status.error_status();
}
const int events = status.get();
@@ -250,111 +303,97 @@
} else {
ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
}
+
+ return {};
}
-int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
- size_t slot) {
+Status<void> BufferHubQueue::AddBuffer(
+ const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
+ ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
+ buffer->id(), slot);
+
if (is_full()) {
- // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
- // import buffer.
ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
capacity_);
- return -E2BIG;
+ return ErrorStatus(E2BIG);
}
- if (buffers_[slot] != nullptr) {
- // Replace the buffer if the slot is preoccupied. This could happen when the
+ if (buffers_[slot]) {
+ // Replace the buffer if the slot is occupied. This could happen when the
// producer side replaced the slot with a newly allocated buffer. Detach the
// buffer before setting up with the new one.
- DetachBuffer(slot);
- epollhup_pending_[slot] = true;
+ auto detach_status = DetachBuffer(slot);
+ if (!detach_status)
+ return detach_status.error_status();
}
- epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
- const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
+ epoll_event event = {.events = EPOLLIN | EPOLLET,
+ .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
+ const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buffer->event_fd(), &event);
if (ret < 0) {
ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
strerror(-ret));
- return ret;
+ return ErrorStatus(-ret);
}
- buffers_[slot] = buf;
+ buffers_[slot] = buffer;
capacity_++;
- return 0;
+ return {};
}
-int BufferHubQueue::DetachBuffer(size_t slot) {
- auto& buf = buffers_[slot];
- if (buf == nullptr) {
- ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
- return -EINVAL;
+Status<void> BufferHubQueue::DetachBuffer(size_t slot) {
+ ALOGD_IF(TRACE, "BufferHubQueue::DetachBuffer: slot=%zu", slot);
+
+ if (buffers_[slot]) {
+ const int ret =
+ epoll_fd_.Control(EPOLL_CTL_DEL, buffers_[slot]->event_fd(), nullptr);
+ if (ret < 0) {
+ ALOGE(
+ "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll "
+ "set: "
+ "%s",
+ strerror(-ret));
+ return ErrorStatus(-ret);
+ }
+
+ buffers_[slot] = nullptr;
+ capacity_--;
}
- const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
- if (ret < 0) {
- ALOGE(
- "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
- "%s",
- strerror(-ret));
- return ret;
- }
-
- buffers_[slot] = nullptr;
- capacity_--;
- return 0;
+ return {};
}
-void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
- size_t slot) {
- if (count() == capacity_) {
+Status<void> BufferHubQueue::Enqueue(Entry entry) {
+ if (!is_full()) {
+ available_buffers_.Append(std::move(entry));
+ return {};
+ } else {
ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
- return;
+ return ErrorStatus(E2BIG);
}
-
- // Set slot buffer back to vector.
- // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
- // the limitation of the RingBuffer we are using. Would be better to refactor
- // that.
- BufferInfo buffer_info(slot, meta_size_);
- buffer_info.buffer = buf;
- // Swap metadata loaded during onBufferReady into vector.
- std::swap(buffer_info.metadata, meta_buffer_tmp_);
-
- available_buffers_.Append(std::move(buffer_info));
}
Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
int timeout, size_t* slot, void* meta, LocalHandle* fence) {
- ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
+ ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
+ timeout);
if (!WaitForBuffers(timeout))
return ErrorStatus(ETIMEDOUT);
- std::shared_ptr<BufferHubBuffer> buf;
- BufferInfo& buffer_info = available_buffers_.Front();
+ auto& entry = available_buffers_.Front();
- *fence = std::move(fences_[buffer_info.slot]);
-
- // Report current pos as the output slot.
- std::swap(buffer_info.slot, *slot);
- // Swap buffer from vector to be returned later.
- std::swap(buffer_info.buffer, buf);
- // Swap metadata from vector into tmp so that we can write out to |meta|.
- std::swap(buffer_info.metadata, meta_buffer_tmp_);
-
- available_buffers_.PopFront();
-
- if (!buf) {
- ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
- return ErrorStatus(ENOBUFS);
- }
-
- if (meta) {
- std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
+ std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
+ *slot = entry.slot;
+ *fence = std::move(entry.fence);
+ if (meta && entry.metadata) {
+ std::copy(entry.metadata.get(), entry.metadata.get() + meta_size_,
reinterpret_cast<uint8_t*>(meta));
}
- return {std::move(buf)};
+ available_buffers_.PopFront();
+
+ return {std::move(buffer)};
}
ProducerQueue::ProducerQueue(size_t meta_size)
@@ -388,28 +427,29 @@
SetupQueue(status.get().meta_size_bytes, status.get().id);
}
-int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
- uint32_t layer_count, uint32_t format,
- uint64_t usage, size_t* out_slot) {
+Status<void> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
+ uint32_t layer_count,
+ uint32_t format, uint64_t usage,
+ size_t* out_slot) {
if (out_slot == nullptr) {
ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
- return -EINVAL;
+ return ErrorStatus(EINVAL);
}
if (is_full()) {
ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
capacity());
- return -E2BIG;
+ return ErrorStatus(E2BIG);
}
- const size_t kBufferCount = 1U;
+ const size_t kBufferCount = 1u;
Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
width, height, layer_count, format, usage, kBufferCount);
if (!status) {
ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
status.GetErrorMessage().c_str());
- return -status.error();
+ return status.error_status();
}
auto buffer_handle_slots = status.take();
@@ -429,27 +469,26 @@
buffer_slot);
}
-int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
- size_t slot) {
+Status<void> ProducerQueue::AddBuffer(
+ const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
- id(), buf->id(), slot);
+ id(), buffer->id(), slot);
// For producer buffer, we need to enqueue the newly added buffer
// immediately. Producer queue starts with all buffers in available state.
- const int ret = BufferHubQueue::AddBuffer(buf, slot);
- if (ret < 0)
- return ret;
+ auto status = BufferHubQueue::AddBuffer(buffer, slot);
+ if (!status)
+ return status;
- Enqueue(buf, slot);
- return 0;
+ return Enqueue(buffer, slot);
}
-int ProducerQueue::DetachBuffer(size_t slot) {
+Status<void> ProducerQueue::DetachBuffer(size_t slot) {
auto status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
if (!status) {
ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
status.GetErrorMessage().c_str());
- return -status.error();
+ return status.error_status();
}
return BufferHubQueue::DetachBuffer(slot);
@@ -471,12 +510,22 @@
return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
}
-int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
- LocalHandle* release_fence) {
- ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
- id(), buf->id());
- auto buffer = std::static_pointer_cast<BufferProducer>(buf);
- return buffer->Gain(release_fence);
+Status<BufferHubQueue::Entry> ProducerQueue::OnBufferReady(
+ const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
+ ALOGD_IF(TRACE,
+ "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
+ id(), buffer->id(), slot);
+
+ // Avoid taking a transient reference, buffer is valid for the duration of
+ // this method call.
+ auto* producer_buffer = static_cast<BufferProducer*>(buffer.get());
+ LocalHandle release_fence;
+
+ const int ret = producer_buffer->Gain(&release_fence);
+ if (ret < 0)
+ return ErrorStatus(-ret);
+ else
+ return {{buffer, nullptr, std::move(release_fence), slot}};
}
ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
@@ -503,12 +552,12 @@
if (!status) {
ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
status.GetErrorMessage().c_str());
- return ErrorStatus(status.error());
+ return status.error_status();
}
int ret;
- int last_error = 0;
- int imported_buffers = 0;
+ Status<void> last_error;
+ size_t imported_buffers_count = 0;
auto buffer_handle_slots = status.take();
for (auto& buffer_handle_slot : buffer_handle_slots) {
@@ -530,53 +579,52 @@
"ConsumerQueue::ImportBuffers: Failed to set ignored state on "
"imported buffer buffer_id=%d: %s",
buffer_consumer->id(), strerror(-ret));
- last_error = ret;
+ last_error = ErrorStatus(-ret);
}
}
- ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
- if (ret < 0) {
+ auto add_status =
+ AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
+ if (!add_status) {
ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
- strerror(-ret));
- last_error = ret;
- continue;
+ add_status.GetErrorMessage().c_str());
+ last_error = add_status;
} else {
- imported_buffers++;
+ imported_buffers_count++;
}
}
- if (imported_buffers > 0)
- return {imported_buffers};
+ if (imported_buffers_count > 0)
+ return {imported_buffers_count};
else
- return ErrorStatus(-last_error);
+ return last_error.error_status();
}
-int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
- size_t slot) {
+Status<void> ConsumerQueue::AddBuffer(
+ const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
- id(), buf->id(), slot);
- const int ret = BufferHubQueue::AddBuffer(buf, slot);
- if (ret < 0)
- return ret;
+ id(), buffer->id(), slot);
+ auto status = BufferHubQueue::AddBuffer(buffer, slot);
+ if (!status)
+ return status;
// Check to see if the buffer is already signaled. This is necessary to catch
// cases where buffers are already available; epoll edge triggered mode does
// not fire until and edge transition when adding new buffers to the epoll
- // set.
- const int kTimeoutMs = 0;
- pollfd pfd{buf->event_fd(), POLLIN, 0};
- const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
- if (count < 0) {
- const int error = errno;
+ // set. Note that we only poll the fd events because HandleBufferEvent() takes
+ // care of checking the translated buffer events.
+ auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
+ if (!poll_status && poll_status.error() != ETIMEDOUT) {
ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
- strerror(errno));
- return -error;
+ poll_status.GetErrorMessage().c_str());
+ return poll_status.error_status();
}
- if (count == 1)
- HandleBufferEvent(slot, pfd.revents);
-
- return 0;
+ // Update accounting if the buffer is available.
+ if (poll_status)
+ return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
+ else
+ return {};
}
Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
@@ -606,15 +654,30 @@
return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
}
-int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
- LocalHandle* acquire_fence) {
- ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
- id(), buf->id());
- auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
- return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
+Status<BufferHubQueue::Entry> ConsumerQueue::OnBufferReady(
+ const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
+ ALOGD_IF(TRACE,
+ "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d slot=%zu",
+ id(), buffer->id(), slot);
+
+ // Avoid taking a transient reference, buffer is valid for the duration of
+ // this method call.
+ auto* consumer_buffer = static_cast<BufferConsumer*>(buffer.get());
+ std::unique_ptr<uint8_t[]> metadata(meta_size_ ? new uint8_t[meta_size_]
+ : nullptr);
+ LocalHandle acquire_fence;
+
+ const int ret =
+ consumer_buffer->Acquire(&acquire_fence, metadata.get(), meta_size_);
+ if (ret < 0)
+ return ErrorStatus(-ret);
+ else
+ return {{buffer, std::move(metadata), std::move(acquire_fence), slot}};
}
Status<void> ConsumerQueue::OnBufferAllocated() {
+ ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
+
auto status = ImportBuffers();
if (!status) {
ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
index 87efeed..932aa37 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
@@ -159,6 +159,8 @@
for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
LocalHandle fence;
auto buffer_status = queue_->Dequeue(dequeue_timeout_ms_, &slot, &fence);
+ if (!buffer_status)
+ return NO_MEMORY;
buffer_producer = buffer_status.take();
if (!buffer_producer)
@@ -608,10 +610,12 @@
PixelFormat format,
uint64_t usage) {
size_t slot;
-
- if (queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot) <
- 0) {
- ALOGE("Failed to allocate new buffer in BufferHub.");
+ auto status =
+ queue_->AllocateBuffer(width, height, layer_count, format, usage, &slot);
+ if (!status) {
+ ALOGE(
+ "BufferHubQueueProducer::AllocateBuffer: Failed to allocate buffer: %s",
+ status.GetErrorMessage().c_str());
return NO_MEMORY;
}
@@ -626,11 +630,11 @@
}
status_t BufferHubQueueProducer::RemoveBuffer(size_t slot) {
- int ret = queue_->DetachBuffer(slot);
- if (ret < 0) {
- ALOGE("BufferHubQueueProducer::RemoveBuffer failed through RPC, ret=%s",
- strerror(-ret));
- return ret;
+ auto status = queue_->DetachBuffer(slot);
+ if (!status) {
+ ALOGE("BufferHubQueueProducer::RemoveBuffer: Failed to detach buffer: %s",
+ status.GetErrorMessage().c_str());
+ return INVALID_OPERATION;
}
// Reset in memory objects related the the buffer.
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index ed67f79..d8d326b 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -21,45 +21,39 @@
// automatically re-requeued when released by the remote side.
class BufferHubQueue : public pdx::Client {
public:
- using LocalHandle = pdx::LocalHandle;
- using LocalChannelHandle = pdx::LocalChannelHandle;
- template <typename T>
- using Status = pdx::Status<T>;
-
virtual ~BufferHubQueue() {}
- void Initialize();
- // Create a new consumer queue that is attached to the producer. Returns
+ // Creates a new consumer queue that is attached to the producer. Returns
// a new consumer queue client or nullptr on failure.
std::unique_ptr<ConsumerQueue> CreateConsumerQueue();
- // Create a new consumer queue that is attached to the producer. This queue
+ // Creates a new consumer queue that is attached to the producer. This queue
// sets each of its imported consumer buffers to the ignored state to avoid
// participation in lifecycle events.
std::unique_ptr<ConsumerQueue> CreateSilentConsumerQueue();
- // Return the default buffer width of this buffer queue.
+ // Returns the default buffer width of this buffer queue.
size_t default_width() const { return default_width_; }
- // Return the default buffer height of this buffer queue.
+ // Returns the default buffer height of this buffer queue.
size_t default_height() const { return default_height_; }
- // Return the default buffer format of this buffer queue.
+ // Returns the default buffer format of this buffer queue.
int32_t default_format() const { return default_format_; }
- // Create a new consumer in handle form for immediate transport over RPC.
- Status<LocalChannelHandle> CreateConsumerQueueHandle();
+ // Creates a new consumer in handle form for immediate transport over RPC.
+ pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle();
- // Return the number of buffers avaiable for dequeue.
+ // Returns the number of buffers avaiable for dequeue.
size_t count() const { return available_buffers_.GetSize(); }
- // Return the total number of buffers that the queue is tracking.
+ // Returns the total number of buffers that the queue is tracking.
size_t capacity() const { return capacity_; }
- // Return the size of metadata structure associated with this BufferBubQueue.
+ // Returns the size of metadata structure associated with this queue.
size_t metadata_size() const { return meta_size_; }
- // Return whether the buffer queue is alrady full.
+ // Returns whether the buffer queue is full.
bool is_full() const { return available_buffers_.IsFull(); }
explicit operator bool() const { return epoll_fd_.IsValid(); }
@@ -68,7 +62,7 @@
return buffers_[slot];
}
- Status<int> GetEventMask(int events) {
+ pdx::Status<int> GetEventMask(int events) {
if (auto* client_channel = GetChannel()) {
return client_channel->GetEventMask(events);
} else {
@@ -86,81 +80,95 @@
// occurred.
bool HandleQueueEvents() { return WaitForBuffers(0); }
- // Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
- // and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
- void Enqueue(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
-
- // |BufferHubQueue| will keep track of at most this value of buffers.
+ // The queue tracks at most this many buffers.
static constexpr size_t kMaxQueueCapacity =
android::BufferQueueDefs::NUM_BUFFER_SLOTS;
- // Special epoll data field indicating that the epoll event refers to the
- // queue.
- static constexpr int64_t kEpollQueueEventIndex = -1;
-
- // When pass |kNoTimeout| to |Dequeue|, it will block indefinitely without a
- // timeout.
static constexpr int kNoTimeOut = -1;
int id() const { return id_; }
bool hung_up() const { return hung_up_; }
protected:
- BufferHubQueue(LocalChannelHandle channel);
+ BufferHubQueue(pdx::LocalChannelHandle channel);
BufferHubQueue(const std::string& endpoint_path);
// Imports the queue parameters by querying BufferHub for the parameters for
// this channel.
- Status<void> ImportQueue();
+ pdx::Status<void> ImportQueue();
// Sets up the queue with the given parameters.
void SetupQueue(size_t meta_size_bytes_, int id);
- // Called by ProducerQueue::AddBuffer and ConsumerQueue::AddBuffer only. to
- // register a buffer for epoll and internal bookkeeping.
- int AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
+ // Register a buffer for management by the queue. Used by subclasses to add a
+ // buffer to internal bookkeeping.
+ pdx::Status<void> AddBuffer(const std::shared_ptr<BufferHubBuffer>& buffer,
+ size_t slot);
- // Called by ProducerQueue::DetachBuffer and ConsumerQueue::DetachBuffer only.
+ // Called by ProducerQueue::DetachBuffer and ConsumerQueue::DetachBuffer only
// to deregister a buffer for epoll and internal bookkeeping.
- virtual int DetachBuffer(size_t slot);
+ virtual pdx::Status<void> DetachBuffer(size_t slot);
// Dequeue a buffer from the free queue, blocking until one is available. The
// timeout argument specifies the number of milliseconds that |Dequeue()| will
- // block. Specifying a timeout of -1 causes |Dequeue()| to block indefinitely,
- // while specifying a timeout equal to zero cause |Dequeue()| to return
+ // block. Specifying a timeout of -1 causes Dequeue() to block indefinitely,
+ // while specifying a timeout equal to zero cause Dequeue() to return
// immediately, even if no buffers are available.
- pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue(int timeout,
- size_t* slot,
- void* meta,
- LocalHandle* fence);
+ pdx::Status<std::shared_ptr<BufferHubBuffer>> Dequeue(
+ int timeout, size_t* slot, void* meta, pdx::LocalHandle* fence);
- // Wait for buffers to be released and re-add them to the queue.
+ // Waits for buffers to become available and adds them to the available queue.
bool WaitForBuffers(int timeout);
- void HandleBufferEvent(size_t slot, int poll_events);
- void HandleQueueEvent(int poll_events);
- virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
- LocalHandle* fence) = 0;
+ pdx::Status<void> HandleBufferEvent(size_t slot, int event_fd,
+ int poll_events);
+ pdx::Status<void> HandleQueueEvent(int poll_events);
+
+ // Entry in the ring buffer of available buffers that stores related
+ // per-buffer data.
+ struct Entry {
+ Entry() : slot(0) {}
+ Entry(const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot)
+ : buffer(buffer), slot(slot) {}
+ Entry(const std::shared_ptr<BufferHubBuffer>& buffer,
+ std::unique_ptr<uint8_t[]> metadata, pdx::LocalHandle fence,
+ size_t slot)
+ : buffer(buffer),
+ metadata(std::move(metadata)),
+ fence(std::move(fence)),
+ slot(slot) {}
+ Entry(Entry&&) = default;
+ Entry& operator=(Entry&&) = default;
+
+ std::shared_ptr<BufferHubBuffer> buffer;
+ std::unique_ptr<uint8_t[]> metadata;
+ pdx::LocalHandle fence;
+ size_t slot;
+ };
+
+ // Enqueues a buffer to the available list (Gained for producer or Acquireed
+ // for consumer).
+ pdx::Status<void> Enqueue(Entry entry);
+
+ virtual pdx::Status<Entry> OnBufferReady(
+ const std::shared_ptr<BufferHubBuffer>& buf, size_t slot) = 0;
// Called when a buffer is allocated remotely.
- virtual Status<void> OnBufferAllocated() { return {}; }
+ virtual pdx::Status<void> OnBufferAllocated() { return {}; }
- // Data members to handle arbitrary metadata passed through BufferHub. It is
- // fair to enforce that all buffers in the same queue share the same metadata
- // type. |meta_size_| is used to store the size of metadata on queue creation;
- // and |meta_buffer_tmp_| is allocated and resized to |meta_size_| on queue
- // creation to be later used as temporary space so that we can avoid
- // additional dynamic memory allocation in each |Enqueue| and |Dequeue| call.
- size_t meta_size_;
-
- // Here we intentionally choose |unique_ptr<uint8_t[]>| over vector<uint8_t>
- // to disallow dynamic resizing for stability reasons.
- std::unique_ptr<uint8_t[]> meta_buffer_tmp_;
+ // Size of the metadata that buffers in this queue cary.
+ size_t meta_size_{0};
private:
+ void Initialize();
+
+ // Special epoll data field indicating that the epoll event refers to the
+ // queue.
+ static constexpr int64_t kEpollQueueEventIndex = -1;
+
static constexpr size_t kMaxEvents = 128;
- // The |u64| data field of an epoll event is interpreted as int64_t:
+ // The u64 data field of an epoll event is interpreted as int64_t:
// When |index| >= 0 and |index| < kMaxQueueCapacity it refers to a specific
// element of |buffers_| as a direct index;
static bool is_buffer_event_index(int64_t index) {
@@ -168,47 +176,11 @@
index < static_cast<int64_t>(BufferHubQueue::kMaxQueueCapacity);
}
- // When |index| == kEpollQueueEventIndex, it refers to the queue itself.
+ // When |index| == kEpollQueueEventIndex it refers to the queue itself.
static bool is_queue_event_index(int64_t index) {
return index == BufferHubQueue::kEpollQueueEventIndex;
}
- struct BufferInfo {
- // A logical slot number that is assigned to a buffer at allocation time.
- // The slot number remains unchanged during the entire life cycle of the
- // buffer and should not be confused with the enqueue and dequeue order.
- size_t slot;
-
- // A BufferHubBuffer client.
- std::shared_ptr<BufferHubBuffer> buffer;
-
- // Metadata associated with the buffer.
- std::unique_ptr<uint8_t[]> metadata;
-
- BufferInfo() : BufferInfo(-1, 0) {}
-
- BufferInfo(size_t slot, size_t metadata_size)
- : slot(slot),
- buffer(nullptr),
- metadata(metadata_size ? new uint8_t[metadata_size] : nullptr) {}
-
- BufferInfo(BufferInfo&& other)
- : slot(other.slot),
- buffer(std::move(other.buffer)),
- metadata(std::move(other.metadata)) {}
-
- BufferInfo& operator=(BufferInfo&& other) {
- slot = other.slot;
- buffer = std::move(other.buffer);
- metadata = std::move(other.metadata);
- return *this;
- }
-
- private:
- BufferInfo(const BufferInfo&) = delete;
- void operator=(BufferInfo&) = delete;
- };
-
// Default buffer width that can be set to override the buffer width when a
// width and height of 0 are specified in AllocateBuffer.
size_t default_width_{1};
@@ -221,49 +193,18 @@
// isn't specified in AllocateBuffer.
int32_t default_format_{PIXEL_FORMAT_RGBA_8888};
- // Buffer queue:
- // |buffers_| tracks all |BufferHubBuffer|s created by this |BufferHubQueue|.
- std::vector<std::shared_ptr<BufferHubBuffer>> buffers_;
+ // Tracks the buffers belonging to this queue. Buffers are stored according to
+ // "slot" in this vector. Each slot is a logical id of the buffer within this
+ // queue regardless of its queue position or presence in the ring buffer.
+ std::vector<std::shared_ptr<BufferHubBuffer>> buffers_{kMaxQueueCapacity};
- // |epollhup_pending_| tracks whether a slot of |buffers_| get detached before
- // its corresponding EPOLLHUP event got handled. This could happen as the
- // following sequence:
- // 1. Producer queue's client side allocates a new buffer (at slot 1).
- // 2. Producer queue's client side replaces an existing buffer (at slot 0).
- // This is implemented by first detaching the buffer and then allocating a
- // new buffer.
- // 3. During the same epoll_wait, Consumer queue's client side gets EPOLLIN
- // event on the queue which indicates a new buffer is available and the
- // EPOLLHUP event for slot 0. Consumer handles these two events in order.
- // 4. Consumer client calls BufferHubRPC::ConsumerQueueImportBuffers and both
- // slot 0 and (the new) slot 1 buffer will be imported. During the import
- // of the buffer at slot 1, consumer client detaches the old buffer so that
- // the new buffer can be registered. At the same time
- // |epollhup_pending_[slot]| is marked to indicate that buffer at this slot
- // was detached prior to EPOLLHUP event.
- // 5. Consumer client continues to handle the EPOLLHUP. Since
- // |epollhup_pending_[slot]| is marked as true, it can safely ignore the
- // event without detaching the newly allocated buffer at slot 1.
- //
- // In normal situations where the previously described sequence doesn't
- // happen, an EPOLLHUP event should trigger a regular buffer detach.
- std::vector<bool> epollhup_pending_;
+ // Buffers and related data that are available for dequeue.
+ RingBuffer<Entry> available_buffers_{kMaxQueueCapacity};
- // |available_buffers_| uses |dvr::RingBuffer| to implementation queue
- // sematics. When |Dequeue|, we pop the front element from
- // |available_buffers_|, and that buffer's reference count will decrease by
- // one, while another reference in |buffers_| keeps the last reference to
- // prevent the buffer from being deleted.
- RingBuffer<BufferInfo> available_buffers_;
+ // Keeps track with how many buffers have been added into the queue.
+ size_t capacity_{0};
- // Fences (acquire fence for consumer and release fence for consumer) , one
- // for each buffer slot.
- std::vector<LocalHandle> fences_;
-
- // Keep track with how many buffers have been added into the queue.
- size_t capacity_;
-
- // Epoll fd used to wait for BufferHub events.
+ // Epoll fd used to manage buffer events.
EpollFileDescriptor epoll_fd_;
// Flag indicating that the other side hung up. For ProducerQueues this
@@ -273,7 +214,7 @@
bool hung_up_{false};
// Global id for the queue that is consistent across processes.
- int id_;
+ int id_{-1};
BufferHubQueue(const BufferHubQueue&) = delete;
void operator=(BufferHubQueue&) = delete;
@@ -317,15 +258,15 @@
usage_deny_set_mask, usage_deny_clear_mask);
}
- // Import a |ProducerQueue| from a channel handle.
- static std::unique_ptr<ProducerQueue> Import(LocalChannelHandle handle) {
+ // Import a ProducerQueue from a channel handle.
+ static std::unique_ptr<ProducerQueue> Import(pdx::LocalChannelHandle handle) {
return BASE::Create(std::move(handle));
}
// Get a buffer producer. Note that the method doesn't check whether the
// buffer slot has a valid buffer that has been allocated already. When no
- // buffer has been imported before it returns |nullptr|; otherwise it returns
- // a shared pointer to a |BufferProducer|.
+ // buffer has been imported before it returns nullptr; otherwise it returns
+ // a shared pointer to a BufferProducer.
std::shared_ptr<BufferProducer> GetBuffer(size_t slot) const {
return std::static_pointer_cast<BufferProducer>(
BufferHubQueue::GetBuffer(slot));
@@ -333,26 +274,30 @@
// Allocate producer buffer to populate the queue. Once allocated, a producer
// buffer is automatically enqueue'd into the ProducerQueue and available to
- // use (i.e. in |Gain|'ed mode).
- // Returns Zero on success and negative error code when buffer allocation
- // fails.
- int AllocateBuffer(uint32_t width, uint32_t height, uint32_t layer_count,
- uint32_t format, uint64_t usage, size_t* out_slot);
+ // use (i.e. in GAINED state).
+ pdx::Status<void> AllocateBuffer(uint32_t width, uint32_t height,
+ uint32_t layer_count, uint32_t format,
+ uint64_t usage, size_t* out_slot);
// Add a producer buffer to populate the queue. Once added, a producer buffer
- // is available to use (i.e. in |Gain|'ed mode).
- int AddBuffer(const std::shared_ptr<BufferProducer>& buf, size_t slot);
+ // is available to use (i.e. in GAINED state).
+ pdx::Status<void> AddBuffer(const std::shared_ptr<BufferProducer>& buffer,
+ size_t slot);
// Detach producer buffer from the queue.
- // Returns Zero on success and negative error code when buffer detach
- // fails.
- int DetachBuffer(size_t slot) override;
+ pdx::Status<void> DetachBuffer(size_t slot) override;
// Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
// and caller should call Post() once it's done writing to release the buffer
// to the consumer side.
pdx::Status<std::shared_ptr<BufferProducer>> Dequeue(
- int timeout, size_t* slot, LocalHandle* release_fence);
+ int timeout, size_t* slot, pdx::LocalHandle* release_fence);
+
+ // Enqueues a producer buffer in the queue.
+ pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer,
+ size_t slot) {
+ return BufferHubQueue::Enqueue({buffer, slot});
+ }
private:
friend BASE;
@@ -361,13 +306,13 @@
// static template methods inherited from ClientBase, which take the same
// arguments as the constructors.
explicit ProducerQueue(size_t meta_size);
- ProducerQueue(LocalChannelHandle handle);
+ explicit ProducerQueue(pdx::LocalChannelHandle handle);
ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
uint64_t usage_clear_mask, uint64_t usage_deny_set_mask,
uint64_t usage_deny_clear_mask);
- int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
- LocalHandle* release_fence) override;
+ pdx::Status<Entry> OnBufferReady(
+ const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) override;
};
// Explicit specializations of ProducerQueue::Create for void metadata type.
@@ -399,7 +344,7 @@
// used to avoid participation in the buffer lifecycle by a consumer queue
// that is only used to spawn other consumer queues, such as in an
// intermediate service.
- static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle,
+ static std::unique_ptr<ConsumerQueue> Import(pdx::LocalChannelHandle handle,
bool ignore_on_import = false) {
return std::unique_ptr<ConsumerQueue>(
new ConsumerQueue(std::move(handle), ignore_on_import));
@@ -407,7 +352,7 @@
// Import newly created buffers from the service side.
// Returns number of buffers successfully imported or an error.
- Status<size_t> ImportBuffers();
+ pdx::Status<size_t> ImportBuffers();
// Dequeue a consumer buffer to read. The returned buffer in |Acquired|'ed
// mode, and caller should call Releasse() once it's done writing to release
@@ -417,33 +362,34 @@
// when the buffer is orignally created.
template <typename Meta>
pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
- int timeout, size_t* slot, Meta* meta, LocalHandle* acquire_fence) {
+ int timeout, size_t* slot, Meta* meta, pdx::LocalHandle* acquire_fence) {
return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
}
pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
- int timeout, size_t* slot, LocalHandle* acquire_fence) {
+ int timeout, size_t* slot, pdx::LocalHandle* acquire_fence) {
return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
}
pdx::Status<std::shared_ptr<BufferConsumer>> Dequeue(
int timeout, size_t* slot, void* meta, size_t meta_size,
- LocalHandle* acquire_fence);
+ pdx::LocalHandle* acquire_fence);
private:
friend BufferHubQueue;
- ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import = false);
+ ConsumerQueue(pdx::LocalChannelHandle handle, bool ignore_on_import = false);
// Add a consumer buffer to populate the queue. Once added, a consumer buffer
// is NOT available to use until the producer side |Post| it. |WaitForBuffers|
// will catch the |Post| and |Acquire| the buffer to make it available for
// consumer.
- int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
+ pdx::Status<void> AddBuffer(const std::shared_ptr<BufferConsumer>& buffer,
+ size_t slot);
- int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
- LocalHandle* acquire_fence) override;
+ pdx::Status<Entry> OnBufferReady(
+ const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) override;
- Status<void> OnBufferAllocated() override;
+ pdx::Status<void> OnBufferAllocated() override;
// Flag indicating that imported (consumer) buffers should be ignored when
// imported to avoid participating in the buffer ownership flow.
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
index fe0b12a..ff2e146 100644
--- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
+++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
@@ -6,6 +6,9 @@
#include <vector>
+// Enable/disable debug logging.
+#define TRACE 0
+
namespace android {
namespace dvr {
@@ -51,13 +54,16 @@
CreateConsumerQueue();
}
- void AllocateBuffer() {
+ void AllocateBuffer(size_t* slot_out = nullptr) {
// Create producer buffer.
size_t slot;
- int ret = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
- kBufferLayerCount, kBufferFormat,
- kBufferUsage, &slot);
- ASSERT_EQ(ret, 0);
+ auto status = producer_queue_->AllocateBuffer(
+ kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
+ kBufferUsage, &slot);
+ ASSERT_TRUE(status.ok());
+
+ if (slot_out)
+ *slot_out = slot;
}
protected:
@@ -94,13 +100,13 @@
}
TEST_F(BufferHubQueueTest, TestProducerConsumer) {
- const size_t nb_buffer = 16;
+ const size_t kBufferCount = 16;
size_t slot;
uint64_t seq;
ASSERT_TRUE(CreateQueues<uint64_t>());
- for (size_t i = 0; i < nb_buffer; i++) {
+ for (size_t i = 0; i < kBufferCount; i++) {
AllocateBuffer();
// Producer queue has all the available buffers on initialize.
@@ -120,14 +126,23 @@
ASSERT_EQ(consumer_queue_->capacity(), i + 1);
}
- for (size_t i = 0; i < nb_buffer; i++) {
+ // Use /dev/zero as a stand-in for a fence. As long as BufferHub does not need
+ // to merge fences, which only happens when multiple consumers release the
+ // same buffer with release fences, the file object should simply pass
+ // through.
+ LocalHandle post_fence("/dev/zero", O_RDONLY);
+ struct stat post_fence_stat;
+ ASSERT_EQ(0, fstat(post_fence.Get(), &post_fence_stat));
+
+ for (size_t i = 0; i < kBufferCount; i++) {
LocalHandle fence;
- // First time, there is no buffer available to dequeue.
+
+ // First time there is no buffer available to dequeue.
auto consumer_status = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
ASSERT_FALSE(consumer_status.ok());
ASSERT_EQ(ETIMEDOUT, consumer_status.error());
- // Make sure Producer buffer is Post()'ed so that it's ready to Accquire
+ // Make sure Producer buffer is POSTED so that it's ready to Accquire
// in the consumer's Dequeue() function.
auto producer_status = producer_queue_->Dequeue(0, &slot, &fence);
ASSERT_TRUE(producer_status.ok());
@@ -135,18 +150,133 @@
ASSERT_NE(nullptr, producer);
uint64_t seq_in = static_cast<uint64_t>(i);
- ASSERT_EQ(producer->Post({}, &seq_in, sizeof(seq_in)), 0);
+ ASSERT_EQ(producer->Post(post_fence, &seq_in, sizeof(seq_in)), 0);
- // Second time, the just |Post()|'ed buffer should be dequeued.
+ // Second time the just the POSTED buffer should be dequeued.
uint64_t seq_out = 0;
consumer_status = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence);
ASSERT_TRUE(consumer_status.ok());
+ EXPECT_TRUE(fence.IsValid());
+
+ struct stat acquire_fence_stat;
+ ASSERT_EQ(0, fstat(fence.Get(), &acquire_fence_stat));
+
+ // The file descriptors should refer to the same file object. Testing the
+ // device id and inode is a proxy for testing that the fds refer to the same
+ // file object.
+ EXPECT_NE(post_fence.Get(), fence.Get());
+ EXPECT_EQ(post_fence_stat.st_dev, acquire_fence_stat.st_dev);
+ EXPECT_EQ(post_fence_stat.st_ino, acquire_fence_stat.st_ino);
+
auto consumer = consumer_status.take();
ASSERT_NE(nullptr, consumer);
ASSERT_EQ(seq_in, seq_out);
}
}
+TEST_F(BufferHubQueueTest, TestDetach) {
+ ASSERT_TRUE(CreateProducerQueue<void>());
+
+ // Allocate buffers.
+ const size_t kBufferCount = 4u;
+ for (size_t i = 0; i < kBufferCount; i++) {
+ AllocateBuffer();
+ }
+ ASSERT_EQ(kBufferCount, producer_queue_->count());
+ ASSERT_EQ(kBufferCount, producer_queue_->capacity());
+
+ consumer_queue_ = producer_queue_->CreateConsumerQueue();
+ ASSERT_NE(nullptr, consumer_queue_);
+
+ // Check that buffers are correctly imported on construction.
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_EQ(0u, consumer_queue_->count());
+
+ // Dequeue all the buffers and keep track of them in an array. This prevents
+ // the producer queue ring buffer ref counts from interfering with the tests.
+ struct Entry {
+ std::shared_ptr<BufferProducer> buffer;
+ LocalHandle fence;
+ size_t slot;
+ };
+ std::array<Entry, kBufferCount> buffers;
+
+ for (size_t i = 0; i < kBufferCount; i++) {
+ Entry* entry = &buffers[i];
+ auto producer_status =
+ producer_queue_->Dequeue(0, &entry->slot, &entry->fence);
+ ASSERT_TRUE(producer_status.ok());
+ entry->buffer = producer_status.take();
+ ASSERT_NE(nullptr, entry->buffer);
+ EXPECT_EQ(i, entry->slot);
+ }
+
+ // Detach a buffer and make sure both queues reflect the change.
+ ASSERT_TRUE(producer_queue_->DetachBuffer(buffers[0].slot));
+ EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
+
+ // As long as the detached buffer is still alive the consumer queue won't know
+ // its gone.
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+
+ // Release the detached buffer.
+ buffers[0].buffer = nullptr;
+
+ // Now the consumer queue should know it's gone.
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount - 1, consumer_queue_->capacity());
+
+ // Allocate a new buffer. This should take the first empty slot.
+ size_t slot;
+ AllocateBuffer(&slot);
+ ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
+ EXPECT_EQ(buffers[0].slot, slot);
+ EXPECT_EQ(kBufferCount, producer_queue_->capacity());
+
+ // The consumer queue should pick up the new buffer.
+ EXPECT_EQ(kBufferCount - 1, consumer_queue_->capacity());
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+
+ // Detach and allocate a buffer.
+ ASSERT_TRUE(producer_queue_->DetachBuffer(buffers[1].slot));
+ EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
+ buffers[1].buffer = nullptr;
+
+ AllocateBuffer(&slot);
+ ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
+ EXPECT_EQ(buffers[1].slot, slot);
+ EXPECT_EQ(kBufferCount, producer_queue_->capacity());
+
+ // The consumer queue should pick up the new buffer but the count shouldn't
+ // change.
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+
+ // Detach and allocate a buffer, but don't free the buffer right away.
+ ASSERT_TRUE(producer_queue_->DetachBuffer(buffers[2].slot));
+ EXPECT_EQ(kBufferCount - 1, producer_queue_->capacity());
+
+ AllocateBuffer(&slot);
+ ALOGE_IF(TRACE, "ALLOCATE %zu", slot);
+ EXPECT_EQ(buffers[2].slot, slot);
+ EXPECT_EQ(kBufferCount, producer_queue_->capacity());
+
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+
+ // Release the producer buffer to trigger a POLLHUP event for an already
+ // detached buffer.
+ buffers[2].buffer = nullptr;
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+ EXPECT_FALSE(consumer_queue_->HandleQueueEvents());
+ EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
+}
+
TEST_F(BufferHubQueueTest, TestMultipleConsumers) {
ASSERT_TRUE(CreateProducerQueue<void>());
@@ -347,10 +477,10 @@
// When allocation, leave out |set_mask| from usage bits on purpose.
size_t slot;
- int ret = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
- kBufferFormat, kBufferLayerCount,
- kBufferUsage & ~set_mask, &slot);
- ASSERT_EQ(0, ret);
+ auto status = producer_queue_->AllocateBuffer(
+ kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
+ kBufferUsage & ~set_mask, &slot);
+ ASSERT_TRUE(status.ok());
LocalHandle fence;
auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
@@ -365,10 +495,10 @@
// When allocation, add |clear_mask| into usage bits on purpose.
size_t slot;
- int ret = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
- kBufferLayerCount, kBufferFormat,
- kBufferUsage | clear_mask, &slot);
- ASSERT_EQ(0, ret);
+ auto status = producer_queue_->AllocateBuffer(
+ kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
+ kBufferUsage | clear_mask, &slot);
+ ASSERT_TRUE(status.ok());
LocalHandle fence;
auto p1_status = producer_queue_->Dequeue(0, &slot, &fence);
@@ -384,16 +514,17 @@
// Now that |deny_set_mask| is illegal, allocation without those bits should
// be able to succeed.
size_t slot;
- int ret = producer_queue_->AllocateBuffer(
+ auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage & ~deny_set_mask, &slot);
- ASSERT_EQ(ret, 0);
+ ASSERT_TRUE(status.ok());
// While allocation with those bits should fail.
- ret = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
- kBufferLayerCount, kBufferFormat,
- kBufferUsage | deny_set_mask, &slot);
- ASSERT_EQ(ret, -EINVAL);
+ status = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
+ kBufferLayerCount, kBufferFormat,
+ kBufferUsage | deny_set_mask, &slot);
+ ASSERT_FALSE(status.ok());
+ ASSERT_EQ(EINVAL, status.error());
}
TEST_F(BufferHubQueueTest, TestUsageDenyClearMask) {
@@ -403,16 +534,17 @@
// Now that clearing |deny_clear_mask| is illegal (i.e. setting these bits are
// mandatory), allocation with those bits should be able to succeed.
size_t slot;
- int ret = producer_queue_->AllocateBuffer(
+ auto status = producer_queue_->AllocateBuffer(
kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
kBufferUsage | deny_clear_mask, &slot);
- ASSERT_EQ(ret, 0);
+ ASSERT_TRUE(status.ok());
// While allocation without those bits should fail.
- ret = producer_queue_->AllocateBuffer(kBufferWidth, kBufferHeight,
- kBufferLayerCount, kBufferFormat,
- kBufferUsage & ~deny_clear_mask, &slot);
- ASSERT_EQ(ret, -EINVAL);
+ status = producer_queue_->AllocateBuffer(
+ kBufferWidth, kBufferHeight, kBufferLayerCount, kBufferFormat,
+ kBufferUsage & ~deny_clear_mask, &slot);
+ ASSERT_FALSE(status.ok());
+ ASSERT_EQ(EINVAL, status.error());
}
} // namespace
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
index 2b6239f..c7692d0 100644
--- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
+++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
@@ -192,7 +192,7 @@
EXPECT_EQ(NO_ERROR,
mProducer->query(NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS, &value));
EXPECT_LE(0, value);
- EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, static_cast<size_t>(value));
+ EXPECT_GE(BufferQueueDefs::NUM_BUFFER_SLOTS, value);
EXPECT_EQ(NO_ERROR,
mProducer->query(NATIVE_WINDOW_CONSUMER_RUNNING_BEHIND, &value));
diff --git a/libs/vr/libdisplay/display_client.cpp b/libs/vr/libdisplay/display_client.cpp
index 935ca2e..72db0dc 100644
--- a/libs/vr/libdisplay/display_client.cpp
+++ b/libs/vr/libdisplay/display_client.cpp
@@ -138,13 +138,13 @@
ALOGD_IF(TRACE, "Surface::CreateQueue: Allocating %zu buffers...", capacity);
for (size_t i = 0; i < capacity; i++) {
size_t slot;
- const int ret = producer_queue->AllocateBuffer(width, height, layer_count,
- format, usage, &slot);
- if (ret < 0) {
+ auto allocate_status = producer_queue->AllocateBuffer(
+ width, height, layer_count, format, usage, &slot);
+ if (!allocate_status) {
ALOGE(
"Surface::CreateQueue: Failed to allocate buffer on queue_id=%d: %s",
- producer_queue->id(), strerror(-ret));
- return ErrorStatus(ENOMEM);
+ producer_queue->id(), allocate_status.GetErrorMessage().c_str());
+ return allocate_status.error_status();
}
ALOGD_IF(
TRACE,
diff --git a/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp b/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp
index 474e968..5d12020 100644
--- a/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp
+++ b/libs/vr/libdvr/tests/dvr_buffer_queue-test.cpp
@@ -40,10 +40,11 @@
void AllocateBuffers(size_t buffer_count) {
size_t out_slot;
for (size_t i = 0; i < buffer_count; i++) {
- int ret = GetProducerQueueFromDvrWriteBufferQueue(write_queue_)
- ->AllocateBuffer(kBufferWidth, kBufferHeight, kLayerCount,
- kBufferFormat, kBufferUsage, &out_slot);
- ASSERT_EQ(0, ret);
+ auto status =
+ GetProducerQueueFromDvrWriteBufferQueue(write_queue_)
+ ->AllocateBuffer(kBufferWidth, kBufferHeight, kLayerCount,
+ kBufferFormat, kBufferUsage, &out_slot);
+ ASSERT_TRUE(status.ok());
}
}