Create consumer state mask(s) immediately after producer allocate new buffer.

No functionality change in the current use cases.

Create consumer state mask(s) for newly allocated buffer in producer
queue for all connected consumer channels immediately before signaling
the consumer channels that there is a new buffer allocated in the queue.

Previously, the consumer queue imports newly allocated buffer upon
dequeue. During importing the buffer, after the IPC, bufferhubd does
two things:
1. create consumer state mask
2. create consumer channel

This changelist moves the first step before the IPC, moving it right
after the buffer is allocated in the producer queue. This changelist
does not affect the second step.

Test: AHardwareBufferTest BufferHubBuffer_test BufferHubMetadata_test
buffer_hub_binder_service-test buffer_hub_queue_producer-test
libsensor_test vrflinger_test buffer_hub-test
buffer_hub_queue-test dvr_buffer_queue-test dvr_api-test
dvr_display-Test

Bug: 119112218

Change-Id: I313e7ea98fcb555e08560e4c7e9bf295e40e1f27
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index 397c0ae..c6e8ea9 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -248,21 +248,7 @@
   return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
 }
 
-Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) {
-  ATRACE_NAME("ProducerChannel::CreateConsumer");
-  ALOGD_IF(TRACE,
-           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
-           buffer_id(), producer_owns_);
-
-  int channel_id;
-  auto status = message.PushChannel(0, nullptr, &channel_id);
-  if (!status) {
-    ALOGE(
-        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
-        status.GetErrorMessage().c_str());
-    return ErrorStatus(ENOMEM);
-  }
-
+Status<uint64_t> ProducerChannel::CreateConsumerStateMask() {
   // Try find the next consumer state bit which has not been claimed by any
   // consumer yet.
   // memory_order_acquire is chosen here because all writes in other threads
@@ -277,7 +263,6 @@
         "consumers per producer: 63.");
     return ErrorStatus(E2BIG);
   }
-
   uint64_t updated_active_clients_bit_mask =
       current_active_clients_bit_mask | client_state_mask;
   // Set the updated value only if the current value stays the same as what was
@@ -286,28 +271,71 @@
   // thread, and the modification will be visible in other threads that acquire
   // active_clients_bit_mask_. If the comparison fails, load the result of
   // all writes from all threads to updated_active_clients_bit_mask.
-  if (!active_clients_bit_mask_->compare_exchange_weak(
-          current_active_clients_bit_mask, updated_active_clients_bit_mask,
-          std::memory_order_acq_rel, std::memory_order_acquire)) {
+  // Keep on finding the next available slient state mask until succeed or out
+  // of memory.
+  while (!active_clients_bit_mask_->compare_exchange_weak(
+      current_active_clients_bit_mask, updated_active_clients_bit_mask,
+      std::memory_order_acq_rel, std::memory_order_acquire)) {
     ALOGE("Current active clients bit mask is changed to %" PRIx64
-          ", which was expected to be %" PRIx64 ".",
+          ", which was expected to be %" PRIx64
+          ". Trying to generate a new client state mask to resolve race "
+          "condition.",
           updated_active_clients_bit_mask, current_active_clients_bit_mask);
-    return ErrorStatus(EBUSY);
+    client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask(
+        current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
+    if (client_state_mask == 0ULL) {
+      ALOGE(
+          "ProducerChannel::CreateConsumer: reached the maximum mumber of "
+          "consumers per producer: 63.");
+      return ErrorStatus(E2BIG);
+    }
+    updated_active_clients_bit_mask =
+        current_active_clients_bit_mask | client_state_mask;
   }
 
-  auto consumer =
-      std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id,
-                                        client_state_mask, shared_from_this());
+  return {client_state_mask};
+}
+
+void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) {
+  // Clear up the buffer state and fence state in case there is already
+  // something there due to possible race condition between producer post and
+  // consumer failed to create channel.
+  buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+  fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+
+  // Restore the consumer state bit and make it visible in other threads that
+  // acquire the active_clients_bit_mask_.
+  active_clients_bit_mask_->fetch_and(~consumer_state_mask,
+                                      std::memory_order_release);
+}
+
+Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
+    Message& message, uint64_t consumer_state_mask) {
+  ATRACE_NAME("ProducerChannel::CreateConsumer");
+  ALOGD_IF(TRACE,
+           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
+           buffer_id(), producer_owns_);
+
+  int channel_id;
+  auto status = message.PushChannel(0, nullptr, &channel_id);
+  if (!status) {
+    ALOGE(
+        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
+        status.GetErrorMessage().c_str());
+    RemoveConsumerClientMask(consumer_state_mask);
+    return ErrorStatus(ENOMEM);
+  }
+
+  auto consumer = std::make_shared<ConsumerChannel>(
+      service(), buffer_id(), channel_id, consumer_state_mask,
+      shared_from_this());
   const auto channel_status = service()->SetChannel(channel_id, consumer);
   if (!channel_status) {
     ALOGE(
         "ProducerChannel::CreateConsumer: failed to set new consumer channel: "
         "%s",
         channel_status.GetErrorMessage().c_str());
-    // Restore the consumer state bit and make it visible in other threads that
-    // acquire the active_clients_bit_mask_.
-    active_clients_bit_mask_->fetch_and(~client_state_mask,
-                                        std::memory_order_release);
+    RemoveConsumerClientMask(consumer_state_mask);
     return ErrorStatus(ENOMEM);
   }
 
@@ -327,7 +355,11 @@
 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
   ATRACE_NAME("ProducerChannel::OnNewConsumer");
   ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
-  return CreateConsumer(message);
+  auto status = CreateConsumerStateMask();
+  if (!status.ok()) {
+    return status.error_status();
+  }
+  return CreateConsumer(message, /*consumer_state_mask=*/status.get());
 }
 
 Status<void> ProducerChannel::OnProducerPost(Message&,