blob: 08f1e9d1194d555fe73f603a7ecd89fe74a4b6dc [file] [log] [blame]
#include "producer_queue_channel.h"
#include "consumer_queue_channel.h"
#include "producer_channel.h"
using android::pdx::RemoteChannelHandle;
using android::pdx::rpc::DispatchRemoteMethod;
namespace android {
namespace dvr {
ProducerQueueChannel::ProducerQueueChannel(
BufferHubService* service, int channel_id, size_t meta_size_bytes,
int usage_set_mask, int usage_clear_mask, int usage_deny_set_mask,
int usage_deny_clear_mask, int* error)
: BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
meta_size_bytes_(meta_size_bytes),
usage_set_mask_(usage_set_mask),
usage_clear_mask_(usage_clear_mask),
usage_deny_set_mask_(usage_deny_set_mask),
usage_deny_clear_mask_(usage_deny_clear_mask),
capacity_(0) {
*error = 0;
}
ProducerQueueChannel::~ProducerQueueChannel() {}
/* static */
std::shared_ptr<ProducerQueueChannel> ProducerQueueChannel::Create(
BufferHubService* service, int channel_id, size_t meta_size_bytes,
int usage_set_mask, int usage_clear_mask, int usage_deny_set_mask,
int usage_deny_clear_mask, int* error) {
// Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
// should be mutually exclusive.
if (usage_deny_set_mask & usage_deny_clear_mask) {
ALOGE(
"BufferHubService::OnCreateProducerQueue: illegal usage mask "
"configuration: usage_deny_set_mask=%d, usage_deny_clear_mask=%d",
usage_deny_set_mask, usage_deny_clear_mask);
*error = -EINVAL;
return nullptr;
}
std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
service, channel_id, meta_size_bytes, usage_set_mask, usage_clear_mask,
usage_deny_set_mask, usage_deny_clear_mask, error));
if (*error < 0)
return nullptr;
else
return producer;
}
bool ProducerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ProducerQueueChannel::HandleMessage");
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
*this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
return true;
case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
*this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
message);
return true;
case BufferHubRPC::ProducerQueueDetachBuffer::Opcode:
DispatchRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(
*this, &ProducerQueueChannel::OnProducerQueueDetachBuffer, message);
return true;
default:
return false;
}
}
void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
}
BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
usage_set_mask_, usage_clear_mask_, usage_deny_set_mask_,
usage_deny_clear_mask_);
}
std::pair<RemoteChannelHandle, size_t>
ProducerQueueChannel::OnCreateConsumerQueue(Message& message) {
ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
ALOGD_IF(TRACE, "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d",
channel_id());
int channel_id;
auto status = message.PushChannel(0, nullptr, &channel_id);
if (!status) {
ALOGE(
"ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
"channel: %s",
status.GetErrorMessage().c_str());
REPLY_ERROR_RETURN(message, ENOMEM, {});
}
const int ret = service()->SetChannel(
channel_id, std::make_shared<ConsumerQueueChannel>(
service(), buffer_id(), channel_id, shared_from_this()));
if (ret < 0) {
ALOGE(
"ProducerQueueChannel::OnCreateConsumerQueue: failed to set new "
"consumer channel: %s",
strerror(-ret));
REPLY_ERROR_RETURN(message, ENOMEM, {});
}
return std::make_pair(status.take(), meta_size_bytes_);
}
std::vector<std::pair<RemoteChannelHandle, size_t>>
ProducerQueueChannel::OnProducerQueueAllocateBuffers(Message& message,
int width, int height,
int format, int usage,
size_t slice_count,
size_t buffer_count) {
ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
ALOGD_IF(TRACE,
"ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
"producer_channel_id=%d",
channel_id());
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
// Deny buffer allocation violating preset rules.
if (usage & usage_deny_set_mask_) {
ALOGE(
"ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %d is "
"not permitted. Violating usage_deny_set_mask, the following bits "
"shall not be set: %d.",
usage, usage_deny_set_mask_);
REPLY_ERROR_RETURN(message, EINVAL, buffer_handles);
}
if (~usage & usage_deny_clear_mask_) {
ALOGE(
"ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %d is "
"not permitted. Violating usage_deny_clear_mask, the following bits "
"must be set: %d.",
usage, usage_deny_clear_mask_);
REPLY_ERROR_RETURN(message, EINVAL, buffer_handles);
}
// Force set mask and clear mask. Note that |usage_set_mask_| takes precedence
// and will overwrite |usage_clear_mask_|.
int effective_usage = (usage & ~usage_clear_mask_) | usage_set_mask_;
for (size_t i = 0; i < buffer_count; i++) {
auto buffer_handle_slot = AllocateBuffer(message, width, height, format,
effective_usage, slice_count);
if (!buffer_handle_slot.first) {
ALOGE(
"ProducerQueueChannel::OnProducerQueueAllocateBuffers: failed to "
"allocate new buffer.");
REPLY_ERROR_RETURN(message, ENOMEM, buffer_handles);
}
buffer_handles.emplace_back(std::move(buffer_handle_slot.first),
buffer_handle_slot.second);
}
return buffer_handles;
}
std::pair<RemoteChannelHandle, size_t> ProducerQueueChannel::AllocateBuffer(
Message& message, int width, int height, int format, int usage,
size_t slice_count) {
ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
ALOGD_IF(TRACE,
"ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
channel_id());
if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
return {};
}
// Here we are creating a new BufferHubBuffer, initialize the producer
// channel, and returning its file handle back to the client.
// buffer_id is the id of the producer channel of BufferHubBuffer.
int buffer_id;
auto status = message.PushChannel(0, nullptr, &buffer_id);
if (!status) {
ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
status.GetErrorMessage().c_str());
return {};
}
ALOGD_IF(TRACE,
"ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%d "
"height=%d format=%d usage=%d slice_count=%zu",
buffer_id, width, height, format, usage, slice_count);
auto buffer_handle = status.take();
int error;
const auto producer_channel = ProducerChannel::Create(
service(), buffer_id, width, height, format, usage,
meta_size_bytes_, slice_count, &error);
if (!producer_channel) {
ALOGE(
"ProducerQueueChannel::AllocateBuffer: Failed to create "
"BufferHubBuffer producer!!");
return {};
}
ALOGD_IF(
TRACE,
"ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
buffer_id, buffer_handle.value());
const int ret = service()->SetChannel(buffer_id, producer_channel);
if (ret < 0) {
ALOGE(
"ProducerQueueChannel::AllocateBuffer: failed to set prodcuer channel "
"for new BufferHubBuffer: %s",
strerror(-ret));
return {};
}
// Register the newly allocated buffer's channel_id into the first empty
// buffer slot.
size_t slot = 0;
for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
if (buffers_[slot].expired())
break;
}
if (slot == BufferHubRPC::kMaxQueueCapacity) {
ALOGE(
"ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
"buffer allocation.");
return {};
}
buffers_[slot] = producer_channel;
capacity_++;
// Notify each consumer channel about the new buffer.
for (auto consumer_channel : consumer_channels_) {
ALOGD(
"ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
"buffer, buffer_id=%d",
buffer_id);
consumer_channel->RegisterNewBuffer(producer_channel, slot);
}
return {std::move(buffer_handle), slot};
}
int ProducerQueueChannel::OnProducerQueueDetachBuffer(Message& message,
size_t slot) {
if (buffers_[slot].expired()) {
ALOGE(
"ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach "
"an invalid buffer producer at slot %zu",
slot);
return -EINVAL;
}
if (capacity_ == 0) {
ALOGE(
"ProducerQueueChannel::OnProducerQueueDetachBuffer: trying to detach a "
"buffer producer while the queue's capacity is already zero.");
return -EINVAL;
}
buffers_[slot].reset();
capacity_--;
return 0;
}
void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
consumer_channels_.push_back(channel);
}
void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
consumer_channels_.erase(
std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
}
} // namespace dvr
} // namespace android