Add capability to do zero-copy read/writes to FMQ

Test: FMQ unit tests
Bug: 33282647

Change-Id: Ieb05e6188554089ee7fa8bf4f499a5bb53861005
diff --git a/tests/mq_test.cpp b/tests/mq_test.cpp
index 3748cfe..2f03904 100644
--- a/tests/mq_test.cpp
+++ b/tests/mq_test.cpp
@@ -28,6 +28,11 @@
     kFmqNotFull = 1 << 1,
 };
 
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>
+          MessageQueueSync;
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kUnsynchronizedWrite>
+            MessageQueueUnsync;
+
 class SynchronizedReadWrites : public ::testing::Test {
 protected:
     virtual void TearDown() {
@@ -36,16 +41,14 @@
 
     virtual void SetUp() {
         static constexpr size_t kNumElementsInQueue = 2048;
-        mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
-               android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+        mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
         ASSERT_NE(nullptr, mQueue);
         ASSERT_TRUE(mQueue->isValid());
         mNumMessagesMax = mQueue->getQuantumCount();
         ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
     }
 
-    android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>*
-            mQueue = nullptr;
+    MessageQueueSync* mQueue = nullptr;
     size_t mNumMessagesMax = 0;
 };
 
@@ -57,16 +60,14 @@
 
     virtual void SetUp() {
         static constexpr size_t kNumElementsInQueue = 2048;
-        mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
-               android::hardware::kUnsynchronizedWrite>(kNumElementsInQueue);
+        mQueue = new (std::nothrow) MessageQueueUnsync(kNumElementsInQueue);
         ASSERT_NE(nullptr, mQueue);
         ASSERT_TRUE(mQueue->isValid());
         mNumMessagesMax = mQueue->getQuantumCount();
         ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
     }
 
-    android::hardware::MessageQueue<uint8_t,
-            android::hardware::kUnsynchronizedWrite>* mQueue = nullptr;
+    MessageQueueUnsync* mQueue = nullptr;
     size_t mNumMessagesMax = 0;
 };
 
@@ -77,8 +78,7 @@
     }
     virtual void SetUp() {
         static constexpr size_t kNumElementsInQueue = 2048;
-        mQueue = new (std::nothrow) android::hardware::MessageQueue<
-            uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+        mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
         ASSERT_NE(nullptr, mQueue);
         ASSERT_TRUE(mQueue->isValid());
         mNumMessagesMax = mQueue->getQuantumCount();
@@ -89,7 +89,7 @@
         std::atomic_init(&mFw, static_cast<uint32_t>(kFmqNotFull));
     }
 
-    android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+    MessageQueueSync* mQueue;
     std::atomic<uint32_t> mFw;
     size_t mNumMessagesMax = 0;
 };
@@ -101,9 +101,8 @@
   }
   virtual void SetUp() {
       static constexpr size_t kNumElementsInQueue = 2049;
-      mQueue = new (std::nothrow) android::hardware::MessageQueue<
-              uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue,
-                                                                  true /* configureEventFlagWord */);
+      mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue,
+                                                   true /* configureEventFlagWord */);
       ASSERT_NE(nullptr, mQueue);
       ASSERT_TRUE(mQueue->isValid());
       mNumMessagesMax = mQueue->getQuantumCount();
@@ -116,11 +115,20 @@
       std::atomic_init(evFlagWordPtr, static_cast<uint32_t>(kFmqNotFull));
   }
 
-  android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+  MessageQueueSync* mQueue;
   size_t mNumMessagesMax = 0;
 };
 
 /*
+ * Utility function to initialize data to be written to the FMQ
+ */
+inline void initData(uint8_t* data, size_t count) {
+    for (size_t i = 0; i < count; i++) {
+        data[i] = i & 0xFF;
+    }
+}
+
+/*
  * This thread will attempt to read and block. When wait returns
  * it checks if the kFmqNotEmpty bit is actually set.
  * If the read is succesful, it signals Wake to kFmqNotFull.
@@ -297,9 +305,7 @@
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
 
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(data, dataLen);
 
     ASSERT_TRUE(mQueue->write(data, dataLen));
     uint8_t readData[dataLen] = {};
@@ -308,9 +314,81 @@
 }
 
 /*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest2) {
+    const size_t dataLen = 16;
+    ASSERT_LE(dataLen, mNumMessagesMax);
+    uint8_t data[dataLen];
+
+    initData(data, dataLen);
+
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+    ASSERT_TRUE(tx.copyTo(data, 0 /* startIdx */, dataLen));
+
+    ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+    uint8_t readData[dataLen] = {};
+
+    ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+    ASSERT_TRUE(tx.copyFrom(readData, 0 /* startIdx */, dataLen));
+
+    ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+    ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite as well as getSlot().
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest3) {
+    const size_t dataLen = 16;
+    ASSERT_LE(dataLen, mNumMessagesMax);
+    uint8_t data[dataLen];
+
+    initData(data, dataLen);
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+    auto first = tx.getFirstRegion();
+    auto second = tx.getSecondRegion();
+
+    ASSERT_EQ(first.getLength() + second.getLength(),  dataLen);
+    for (size_t i = 0; i < dataLen; i++) {
+        uint8_t* ptr = tx.getSlot(i);
+        *ptr = data[i];
+    }
+
+    ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+    uint8_t readData[dataLen] = {};
+
+    ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+    first = tx.getFirstRegion();
+    second = tx.getSecondRegion();
+
+    ASSERT_EQ(first.getLength() + second.getLength(),  dataLen);
+
+    for (size_t i = 0; i < dataLen; i++) {
+        uint8_t* ptr = tx.getSlot(i);
+        readData[i] = *ptr;
+    }
+
+    ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+    ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
  * Verify that read() returns false when trying to read from an empty queue.
  */
-TEST_F(SynchronizedReadWrites, ReadWhenEmpty) {
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty1) {
     ASSERT_EQ(0UL, mQueue->availableToRead());
     const size_t dataLen = 2;
     ASSERT_LE(dataLen, mNumMessagesMax);
@@ -319,18 +397,33 @@
 }
 
 /*
+ * Verify that beginRead() returns a MemTransaction object with null pointers when trying
+ * to read from an empty queue.
+ */
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty2) {
+    ASSERT_EQ(0UL, mQueue->availableToRead());
+    const size_t dataLen = 2;
+    ASSERT_LE(dataLen, mNumMessagesMax);
+
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_FALSE(mQueue->beginRead(dataLen, &tx));
+
+    auto first = tx.getFirstRegion();
+    auto second = tx.getSecondRegion();
+
+    ASSERT_EQ(nullptr, first.getAddress());
+    ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
  * Write the queue until full. Verify that another write is unsuccessful.
  * Verify that availableToWrite() returns 0 as expected.
  */
-
-TEST_F(SynchronizedReadWrites, WriteWhenFull) {
+TEST_F(SynchronizedReadWrites, WriteWhenFull1) {
     ASSERT_EQ(0UL, mQueue->availableToRead());
     std::vector<uint8_t> data(mNumMessagesMax);
 
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
-
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     ASSERT_EQ(0UL, mQueue->availableToWrite());
     ASSERT_FALSE(mQueue->write(&data[0], 1));
@@ -341,16 +434,35 @@
 }
 
 /*
+ * Write the queue until full. Verify that beginWrite() returns
+ * a MemTransaction object with null base pointers.
+ */
+TEST_F(SynchronizedReadWrites, WriteWhenFull2) {
+    ASSERT_EQ(0UL, mQueue->availableToRead());
+    std::vector<uint8_t> data(mNumMessagesMax);
+
+    initData(&data[0], mNumMessagesMax);
+    ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+    ASSERT_EQ(0UL, mQueue->availableToWrite());
+
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_FALSE(mQueue->beginWrite(1, &tx));
+
+    auto first = tx.getFirstRegion();
+    auto second = tx.getSecondRegion();
+
+    ASSERT_EQ(nullptr, first.getAddress());
+    ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
  * Write a chunk of data equal to the queue size.
  * Verify that the write is successful and the subsequent read
  * returns the expected data.
  */
 TEST_F(SynchronizedReadWrites, LargeInputTest1) {
     std::vector<uint8_t> data(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
-
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     std::vector<uint8_t> readData(mNumMessagesMax);
     ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -367,9 +479,8 @@
     const size_t dataLen = 4096;
     ASSERT_GT(dataLen, mNumMessagesMax);
     std::vector<uint8_t> data(dataLen);
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(&data[0], dataLen);
     ASSERT_FALSE(mQueue->write(&data[0], dataLen));
     std::vector<uint8_t> readData(mNumMessagesMax);
     ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -384,9 +495,7 @@
  */
 TEST_F(SynchronizedReadWrites, LargeInputTest3) {
     std::vector<uint8_t> data(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     ASSERT_FALSE(mQueue->write(&data[0], 1));
     std::vector<uint8_t> readData(mNumMessagesMax);
@@ -395,6 +504,26 @@
 }
 
 /*
+ * Verify that beginWrite() returns a MemTransaction with
+ * null base pointers when attempting to write data larger
+ * than the queue size.
+ */
+TEST_F(SynchronizedReadWrites, LargeInputTest4) {
+    ASSERT_EQ(0UL, mQueue->availableToRead());
+    const size_t dataLen = 4096;
+    ASSERT_GT(dataLen, mNumMessagesMax);
+
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_FALSE(mQueue->beginWrite(dataLen, &tx));
+
+    auto first = tx.getFirstRegion();
+    auto second = tx.getSecondRegion();
+
+    ASSERT_EQ(nullptr, first.getAddress());
+    ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
  * Verify that multiple reads one after the other return expected data.
  */
 TEST_F(SynchronizedReadWrites, MultipleRead) {
@@ -403,9 +532,8 @@
     const size_t dataLen = chunkSize * chunkNum;
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(data, dataLen);
     ASSERT_TRUE(mQueue->write(data, dataLen));
     uint8_t readData[dataLen] = {};
     for (size_t i = 0; i < chunkNum; i++) {
@@ -423,9 +551,8 @@
     const size_t dataLen = chunkSize * chunkNum;
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(data, dataLen);
     for (unsigned int i = 0; i < chunkNum; i++) {
         ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
     }
@@ -440,13 +567,11 @@
  * Write mNumMessagesMax messages into the queue. This will cause a
  * wrap around. Read and verify the data.
  */
-TEST_F(SynchronizedReadWrites, ReadWriteWrapAround) {
-    size_t numMessages = mNumMessagesMax / 2;
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround1) {
+    size_t numMessages = mNumMessagesMax - 1;
     std::vector<uint8_t> data(mNumMessagesMax);
     std::vector<uint8_t> readData(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], numMessages));
     ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
@@ -455,15 +580,58 @@
 }
 
 /*
+ * Use beginRead/CommitRead/beginWrite/commitWrite APIs
+ * to test wrap arounds are handled correctly.
+ * Write enough messages into the FMQ to fill half of it
+ * and read back the same.
+ * Write mNumMessagesMax messages into the queue. This will cause a
+ * wrap around. Read and verify the data.
+ */
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround2) {
+    size_t dataLen = mNumMessagesMax - 1;
+    std::vector<uint8_t> data(mNumMessagesMax);
+    std::vector<uint8_t> readData(mNumMessagesMax);
+    initData(&data[0], mNumMessagesMax);
+    ASSERT_TRUE(mQueue->write(&data[0], dataLen));
+    ASSERT_TRUE(mQueue->read(&readData[0], dataLen));
+
+    /*
+     * The next write and read will have to deal with with wrap arounds.
+     */
+    MessageQueueSync::MemTransaction tx;
+    ASSERT_TRUE(mQueue->beginWrite(mNumMessagesMax, &tx));
+
+    auto first = tx.getFirstRegion();
+    auto second = tx.getSecondRegion();
+
+    ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+    ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */,  mNumMessagesMax));
+
+    ASSERT_TRUE(mQueue->commitWrite(mNumMessagesMax));
+
+    ASSERT_TRUE(mQueue->beginRead(mNumMessagesMax, &tx));
+
+    first = tx.getFirstRegion();
+    second = tx.getSecondRegion();
+
+    ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+    ASSERT_TRUE(tx.copyFrom(&readData[0], 0 /* startIdx */, mNumMessagesMax));
+    ASSERT_TRUE(mQueue->commitRead(mNumMessagesMax));
+
+    ASSERT_EQ(data, readData);
+}
+
+/*
  * Verify that a few bytes of data can be successfully written and read.
  */
 TEST_F(UnsynchronizedWrite, SmallInputTest1) {
     const size_t dataLen = 16;
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(data, dataLen);
     ASSERT_TRUE(mQueue->write(data, dataLen));
     uint8_t readData[dataLen] = {};
     ASSERT_TRUE(mQueue->read(readData, dataLen));
@@ -485,13 +653,11 @@
  * Write the queue when full. Verify that a subsequent writes is succesful.
  * Verify that availableToWrite() returns 0 as expected.
  */
-
-TEST_F(UnsynchronizedWrite, WriteWhenFull) {
+TEST_F(UnsynchronizedWrite, WriteWhenFull1) {
     ASSERT_EQ(0UL, mQueue->availableToRead());
     std::vector<uint8_t> data(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     ASSERT_EQ(0UL, mQueue->availableToWrite());
     ASSERT_TRUE(mQueue->write(&data[0], 1));
@@ -501,15 +667,36 @@
 }
 
 /*
+ * Write the queue when full. Verify that a subsequent writes
+ * using beginRead()/commitRead() is succesful.
+ * Verify that the next read fails as expected for unsynchronized flavor.
+ */
+TEST_F(UnsynchronizedWrite, WriteWhenFull2) {
+    ASSERT_EQ(0UL, mQueue->availableToRead());
+    std::vector<uint8_t> data(mNumMessagesMax);
+    ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+
+    MessageQueueUnsync::MemTransaction tx;
+    ASSERT_TRUE(mQueue->beginWrite(1, &tx));
+
+    ASSERT_EQ(tx.getFirstRegion().getLength(), 1U);
+
+    ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */));
+
+    ASSERT_TRUE(mQueue->commitWrite(1));
+
+    std::vector<uint8_t> readData(mNumMessagesMax);
+    ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
+}
+
+/*
  * Write a chunk of data equal to the queue size.
  * Verify that the write is successful and the subsequent read
  * returns the expected data.
  */
 TEST_F(UnsynchronizedWrite, LargeInputTest1) {
     std::vector<uint8_t> data(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     std::vector<uint8_t> readData(mNumMessagesMax);
     ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -526,9 +713,7 @@
     const size_t dataLen = 4096;
     ASSERT_GT(dataLen, mNumMessagesMax);
     std::vector<uint8_t> data(dataLen);
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(&data[0], dataLen);
     ASSERT_FALSE(mQueue->write(&data[0], dataLen));
     std::vector<uint8_t> readData(mNumMessagesMax);
     ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -543,9 +728,7 @@
  */
 TEST_F(UnsynchronizedWrite, LargeInputTest3) {
     std::vector<uint8_t> data(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
     ASSERT_TRUE(mQueue->write(&data[0], 1));
     std::vector<uint8_t> readData(mNumMessagesMax);
@@ -561,9 +744,7 @@
     const size_t dataLen = chunkSize * chunkNum;
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+    initData(data, dataLen);
     ASSERT_TRUE(mQueue->write(data, dataLen));
     uint8_t readData[dataLen] = {};
     for (size_t i = 0; i < chunkNum; i++) {
@@ -581,12 +762,12 @@
     const size_t dataLen = chunkSize * chunkNum;
     ASSERT_LE(dataLen, mNumMessagesMax);
     uint8_t data[dataLen];
-    for (size_t i = 0; i < dataLen; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(data, dataLen);
     for (size_t i = 0; i < chunkNum; i++) {
         ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
     }
+
     uint8_t readData[dataLen] = {};
     ASSERT_TRUE(mQueue->read(readData, dataLen));
     ASSERT_EQ(0, memcmp(readData, data, dataLen));
@@ -599,12 +780,11 @@
  * wrap around. Read and verify the data.
  */
 TEST_F(UnsynchronizedWrite, ReadWriteWrapAround) {
-    size_t numMessages = mNumMessagesMax / 2;
+    size_t numMessages = mNumMessagesMax - 1;
     std::vector<uint8_t> data(mNumMessagesMax);
     std::vector<uint8_t> readData(mNumMessagesMax);
-    for (size_t i = 0; i < mNumMessagesMax; i++) {
-        data[i] = i & 0xFF;
-    }
+
+    initData(&data[0], mNumMessagesMax);
     ASSERT_TRUE(mQueue->write(&data[0], numMessages));
     ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
     ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));