blob: 4fbfcf680cdb876f8dd0149f47fb98b01c82c394 [file] [log] [blame]
#include "include/private/dvr/buffer_hub_queue_client.h"
#include <base/logging.h>
#include <sys/epoll.h>
#include <array>
#include <pdx/default_transport/client_channel.h>
#include <pdx/default_transport/client_channel_factory.h>
#include <pdx/file_handle.h>
#include <private/dvr/bufferhub_rpc.h>
using android::pdx::LocalHandle;
using android::pdx::LocalChannelHandle;
namespace android {
namespace dvr {
BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle,
size_t meta_size)
: Client{pdx::default_transport::ClientChannel::Create(
std::move(channel_handle))},
meta_size_(meta_size),
meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
buffers_(BufferHubQueue::kMaxQueueCapacity),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0) {
Initialize();
}
BufferHubQueue::BufferHubQueue(const std::string& endpoint_path,
size_t meta_size)
: Client{pdx::default_transport::ClientChannelFactory::Create(
endpoint_path)},
meta_size_(meta_size),
meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
buffers_(BufferHubQueue::kMaxQueueCapacity),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0) {
Initialize();
}
void BufferHubQueue::Initialize() {
int ret = epoll_fd_.Create();
if (ret < 0) {
LOG(ERROR) << "BufferHubQueue::BufferHubQueue: Failed to create epoll fd:"
<< strerror(-ret);
return;
}
epoll_event event = {.events = EPOLLIN | EPOLLET,
.data = {.u64 = static_cast<uint64_t>(
BufferHubQueue::kEpollQueueEventIndex)}};
ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
if (ret < 0) {
LOG(ERROR) << "Failed to register ConsumerQueue into epoll event: "
<< strerror(-ret);
}
}
std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
Status<std::pair<LocalChannelHandle, size_t>> status =
InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
if (!status) {
LOG(ERROR) << "Cannot create ConsumerQueue: " << status.GetErrorMessage();
return nullptr;
}
auto return_value = status.take();
VLOG(1) << "CreateConsumerQueue: meta_size_bytes=" << return_value.second;
return ConsumerQueue::Create(std::move(return_value.first),
return_value.second);
}
bool BufferHubQueue::WaitForBuffers(int timeout) {
std::array<epoll_event, kMaxEvents> events;
while (count() == 0) {
int ret = epoll_fd_.Wait(events.data(), events.size(), timeout);
if (ret == 0) {
VLOG(1) << "Wait on epoll returns nothing before timeout.";
return false;
}
if (ret < 0 && ret != -EINTR) {
LOG(ERROR) << "Failed to wait for buffers:" << strerror(-ret);
return false;
}
const int num_events = ret;
// A BufferQueue's epoll fd tracks N+1 events, where there are N events,
// one for each buffer, in the queue and one extra event for the queue
// client itself.
for (int i = 0; i < num_events; i++) {
int64_t index = static_cast<int64_t>(events[i].data.u64);
VLOG(1) << "New BufferHubQueue event " << i << ": index=" << index;
if (is_buffer_event_index(index) && (events[i].events & EPOLLIN)) {
auto buffer = buffers_[index];
ret = OnBufferReady(buffer);
if (ret < 0) {
LOG(ERROR) << "Failed to set buffer ready:" << strerror(-ret);
continue;
}
Enqueue(buffer, index);
} else if (is_buffer_event_index(index) &&
(events[i].events & EPOLLHUP)) {
// This maybe caused by producer replacing an exising buffer slot.
// Currently the epoll FD is cleaned up when the replacement consumer
// client is imported.
LOG(WARNING) << "Receives EPOLLHUP at slot: " << index;
} else if (is_queue_event_index(index) && (events[i].events & EPOLLIN)) {
// Note that after buffer imports, if |count()| still returns 0, epoll
// wait will be tried again to acquire the newly imported buffer.
ret = OnBufferAllocated();
if (ret < 0) {
LOG(ERROR) << "Failed to import buffer:" << strerror(-ret);
continue;
}
} else {
LOG(WARNING) << "Unknown event " << i << ": u64=" << index
<< ": events=" << events[i].events;
}
}
}
return true;
}
int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
size_t slot) {
if (is_full()) {
// TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
// import buffer.
LOG(ERROR) << "BufferHubQueue::AddBuffer queue is at maximum capacity: "
<< capacity_;
return -E2BIG;
}
if (buffers_[slot] != nullptr) {
// Replace the buffer if the slot is preoccupied. This could happen when the
// producer side replaced the slot with a newly allocated buffer. Detach the
// buffer and set up with the new one.
DetachBuffer(slot);
}
epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
if (ret < 0) {
LOG(ERROR)
<< "BufferHubQueue::AddBuffer: Failed to add buffer to epoll set:"
<< strerror(-ret);
return ret;
}
buffers_[slot] = buf;
capacity_++;
return 0;
}
int BufferHubQueue::DetachBuffer(size_t slot) {
auto& buf = buffers_[slot];
if (buf == nullptr) {
LOG(ERROR) << "BufferHubQueue::DetachBuffer: Invalid slot: " << slot;
return -EINVAL;
}
const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
if (ret < 0) {
LOG(ERROR) << "BufferHubQueue::DetachBuffer: Failed to detach buffer from "
"epoll set:"
<< strerror(-ret);
return ret;
}
buffers_[slot] = nullptr;
capacity_--;
return 0;
}
void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
size_t slot) {
if (count() == capacity_) {
LOG(ERROR) << "Buffer queue is full!";
return;
}
// Set slot buffer back to vector.
// TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
// the limitation of the RingBuffer we are using. Would be better to refactor
// that.
BufferInfo buffer_info(slot, meta_size_);
// Swap buffer into vector.
std::swap(buffer_info.buffer, buf);
// Swap metadata loaded during onBufferReady into vector.
std::swap(buffer_info.metadata, meta_buffer_tmp_);
available_buffers_.Append(std::move(buffer_info));
}
std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
size_t* slot,
void* meta) {
VLOG(1) << "Dequeue: count=" << count() << ", timeout=" << timeout;
if (count() == 0 && !WaitForBuffers(timeout))
return nullptr;
std::shared_ptr<BufferHubBuffer> buf;
BufferInfo& buffer_info = available_buffers_.Front();
// Report current pos as the output slot.
std::swap(buffer_info.slot, *slot);
// Swap buffer from vector to be returned later.
std::swap(buffer_info.buffer, buf);
// Swap metadata from vector into tmp so that we can write out to |meta|.
std::swap(buffer_info.metadata, meta_buffer_tmp_);
available_buffers_.PopFront();
if (!buf) {
LOG(ERROR) << "Dequeue: Buffer to be dequeued is nullptr";
return nullptr;
}
if (meta) {
std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
reinterpret_cast<uint8_t*>(meta));
}
return buf;
}
ProducerQueue::ProducerQueue(size_t meta_size)
: ProducerQueue(meta_size, 0, 0, 0, 0) {}
ProducerQueue::ProducerQueue(LocalChannelHandle handle, size_t meta_size)
: BASE(std::move(handle), meta_size) {}
ProducerQueue::ProducerQueue(size_t meta_size, int usage_set_mask,
int usage_clear_mask, int usage_deny_set_mask,
int usage_deny_clear_mask)
: BASE(BufferHubRPC::kClientPath, meta_size) {
auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
meta_size_, usage_set_mask, usage_clear_mask, usage_deny_set_mask,
usage_deny_clear_mask);
if (!status) {
LOG(ERROR)
<< "ProducerQueue::ProducerQueue: Failed to create producer queue: %s"
<< status.GetErrorMessage();
Close(-status.error());
return;
}
}
int ProducerQueue::AllocateBuffer(int width, int height, int format, int usage,
size_t slice_count, size_t* out_slot) {
if (out_slot == nullptr) {
LOG(ERROR) << "Parameter out_slot cannot be null.";
return -EINVAL;
}
if (is_full()) {
LOG(ERROR) << "ProducerQueue::AllocateBuffer queue is at maximum capacity: "
<< capacity();
return -E2BIG;
}
const size_t kBufferCount = 1U;
Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
width, height, format, usage, slice_count, kBufferCount);
if (!status) {
LOG(ERROR) << "ProducerQueue::AllocateBuffer failed to create producer "
"buffer through BufferHub.";
return -status.error();
}
auto buffer_handle_slots = status.take();
CHECK_EQ(buffer_handle_slots.size(), kBufferCount)
<< "BufferHubRPC::ProducerQueueAllocateBuffers should return one and "
"only one buffer handle.";
// We only allocate one buffer at a time.
auto& buffer_handle = buffer_handle_slots[0].first;
size_t buffer_slot = buffer_handle_slots[0].second;
VLOG(1) << "ProducerQueue::AllocateBuffer, new buffer, channel_handle: "
<< buffer_handle.value();
*out_slot = buffer_slot;
return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
buffer_slot);
}
int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
size_t slot) {
// For producer buffer, we need to enqueue the newly added buffer
// immediately. Producer queue starts with all buffers in available state.
const int ret = BufferHubQueue::AddBuffer(buf, slot);
if (ret < 0)
return ret;
Enqueue(buf, slot);
return 0;
}
int ProducerQueue::DetachBuffer(size_t slot) {
Status<int> status =
InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
if (!status) {
LOG(ERROR) << "ProducerQueue::DetachBuffer failed to detach producer "
"buffer through BufferHub, error: "
<< status.GetErrorMessage();
return -status.error();
}
return BufferHubQueue::DetachBuffer(slot);
}
std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout,
size_t* slot) {
auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr);
return std::static_pointer_cast<BufferProducer>(buf);
}
int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
auto buffer = std::static_pointer_cast<BufferProducer>(buf);
return buffer->GainAsync();
}
ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size)
: BASE(std::move(handle), meta_size) {
// TODO(b/34387835) Import consumer queue in case the ProducerQueue we are
// based on was not empty.
}
int ConsumerQueue::ImportBuffers() {
Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
if (!status) {
LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to import consumer "
"buffer through BufferBub, error: "
<< status.GetErrorMessage();
return -status.error();
}
int last_error = 0;
int imported_buffers = 0;
auto buffer_handle_slots = status.take();
for (auto& buffer_handle_slot : buffer_handle_slots) {
VLOG(1) << "ConsumerQueue::ImportBuffers, new buffer, buffer_handle: "
<< buffer_handle_slot.first.value();
std::unique_ptr<BufferConsumer> buffer_consumer =
BufferConsumer::Import(std::move(buffer_handle_slot.first));
int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
if (ret < 0) {
LOG(ERROR) << "ConsumerQueue::ImportBuffers failed to add buffer, ret: "
<< strerror(-ret);
last_error = ret;
continue;
} else {
imported_buffers++;
}
}
return imported_buffers > 0 ? imported_buffers : last_error;
}
int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
size_t slot) {
// Consumer queue starts with all buffers in unavailable state.
return BufferHubQueue::AddBuffer(buf, slot);
}
std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout,
size_t* slot, void* meta,
size_t meta_size) {
if (meta_size != meta_size_) {
LOG(ERROR) << "metadata size (" << meta_size
<< ") for the dequeuing buffer does not match metadata size ("
<< meta_size_ << ") for the queue.";
return nullptr;
}
auto buf = BufferHubQueue::Dequeue(timeout, slot, meta);
return std::static_pointer_cast<BufferConsumer>(buf);
}
int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
LocalHandle fence;
return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_);
}
int ConsumerQueue::OnBufferAllocated() {
const int ret = ImportBuffers();
if (ret == 0) {
LOG(WARNING) << "No new buffer can be imported on buffer allocated event.";
} else if (ret < 0) {
LOG(ERROR) << "Failed to import buffers on buffer allocated event.";
}
VLOG(1) << "Imported " << ret << " consumer buffers.";
return ret;
}
} // namespace dvr
} // namespace android