Add capability to do zero-copy read/writes to FMQ
Test: FMQ unit tests
Bug: 33282647
Change-Id: Ieb05e6188554089ee7fa8bf4f499a5bb53861005
diff --git a/tests/mq_test.cpp b/tests/mq_test.cpp
index 3748cfe..2f03904 100644
--- a/tests/mq_test.cpp
+++ b/tests/mq_test.cpp
@@ -28,6 +28,11 @@
kFmqNotFull = 1 << 1,
};
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>
+ MessageQueueSync;
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kUnsynchronizedWrite>
+ MessageQueueUnsync;
+
class SynchronizedReadWrites : public ::testing::Test {
protected:
virtual void TearDown() {
@@ -36,16 +41,14 @@
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
- android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>*
- mQueue = nullptr;
+ MessageQueueSync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -57,16 +60,14 @@
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
- android::hardware::kUnsynchronizedWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueUnsync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
}
- android::hardware::MessageQueue<uint8_t,
- android::hardware::kUnsynchronizedWrite>* mQueue = nullptr;
+ MessageQueueUnsync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -77,8 +78,7 @@
}
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<
- uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
@@ -89,7 +89,7 @@
std::atomic_init(&mFw, static_cast<uint32_t>(kFmqNotFull));
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+ MessageQueueSync* mQueue;
std::atomic<uint32_t> mFw;
size_t mNumMessagesMax = 0;
};
@@ -101,9 +101,8 @@
}
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2049;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<
- uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue,
- true /* configureEventFlagWord */);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue,
+ true /* configureEventFlagWord */);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
@@ -116,11 +115,20 @@
std::atomic_init(evFlagWordPtr, static_cast<uint32_t>(kFmqNotFull));
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+ MessageQueueSync* mQueue;
size_t mNumMessagesMax = 0;
};
/*
+ * Utility function to initialize data to be written to the FMQ
+ */
+inline void initData(uint8_t* data, size_t count) {
+ for (size_t i = 0; i < count; i++) {
+ data[i] = i & 0xFF;
+ }
+}
+
+/*
* This thread will attempt to read and block. When wait returns
* it checks if the kFmqNotEmpty bit is actually set.
* If the read is succesful, it signals Wake to kFmqNotFull.
@@ -297,9 +305,7 @@
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
@@ -308,9 +314,81 @@
}
/*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest2) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ uint8_t data[dataLen];
+
+ initData(data, dataLen);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+ ASSERT_TRUE(tx.copyTo(data, 0 /* startIdx */, dataLen));
+
+ ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+ uint8_t readData[dataLen] = {};
+
+ ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+ ASSERT_TRUE(tx.copyFrom(readData, 0 /* startIdx */, dataLen));
+
+ ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+ ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite as well as getSlot().
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest3) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ uint8_t data[dataLen];
+
+ initData(data, dataLen);
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), dataLen);
+ for (size_t i = 0; i < dataLen; i++) {
+ uint8_t* ptr = tx.getSlot(i);
+ *ptr = data[i];
+ }
+
+ ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+ uint8_t readData[dataLen] = {};
+
+ ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+ first = tx.getFirstRegion();
+ second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), dataLen);
+
+ for (size_t i = 0; i < dataLen; i++) {
+ uint8_t* ptr = tx.getSlot(i);
+ readData[i] = *ptr;
+ }
+
+ ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+ ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
* Verify that read() returns false when trying to read from an empty queue.
*/
-TEST_F(SynchronizedReadWrites, ReadWhenEmpty) {
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
const size_t dataLen = 2;
ASSERT_LE(dataLen, mNumMessagesMax);
@@ -319,18 +397,33 @@
}
/*
+ * Verify that beginRead() returns a MemTransaction object with null pointers when trying
+ * to read from an empty queue.
+ */
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ const size_t dataLen = 2;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginRead(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Write the queue until full. Verify that another write is unsuccessful.
* Verify that availableToWrite() returns 0 as expected.
*/
-
-TEST_F(SynchronizedReadWrites, WriteWhenFull) {
+TEST_F(SynchronizedReadWrites, WriteWhenFull1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_FALSE(mQueue->write(&data[0], 1));
@@ -341,16 +434,35 @@
}
/*
+ * Write the queue until full. Verify that beginWrite() returns
+ * a MemTransaction object with null base pointers.
+ */
+TEST_F(SynchronizedReadWrites, WriteWhenFull2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ std::vector<uint8_t> data(mNumMessagesMax);
+
+ initData(&data[0], mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+ ASSERT_EQ(0UL, mQueue->availableToWrite());
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginWrite(1, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Write a chunk of data equal to the queue size.
* Verify that the write is successful and the subsequent read
* returns the expected data.
*/
TEST_F(SynchronizedReadWrites, LargeInputTest1) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -367,9 +479,8 @@
const size_t dataLen = 4096;
ASSERT_GT(dataLen, mNumMessagesMax);
std::vector<uint8_t> data(dataLen);
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], dataLen);
ASSERT_FALSE(mQueue->write(&data[0], dataLen));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -384,9 +495,7 @@
*/
TEST_F(SynchronizedReadWrites, LargeInputTest3) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_FALSE(mQueue->write(&data[0], 1));
std::vector<uint8_t> readData(mNumMessagesMax);
@@ -395,6 +504,26 @@
}
/*
+ * Verify that beginWrite() returns a MemTransaction with
+ * null base pointers when attempting to write data larger
+ * than the queue size.
+ */
+TEST_F(SynchronizedReadWrites, LargeInputTest4) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ const size_t dataLen = 4096;
+ ASSERT_GT(dataLen, mNumMessagesMax);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginWrite(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Verify that multiple reads one after the other return expected data.
*/
TEST_F(SynchronizedReadWrites, MultipleRead) {
@@ -403,9 +532,8 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
for (size_t i = 0; i < chunkNum; i++) {
@@ -423,9 +551,8 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
for (unsigned int i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
@@ -440,13 +567,11 @@
* Write mNumMessagesMax messages into the queue. This will cause a
* wrap around. Read and verify the data.
*/
-TEST_F(SynchronizedReadWrites, ReadWriteWrapAround) {
- size_t numMessages = mNumMessagesMax / 2;
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround1) {
+ size_t numMessages = mNumMessagesMax - 1;
std::vector<uint8_t> data(mNumMessagesMax);
std::vector<uint8_t> readData(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
@@ -455,15 +580,58 @@
}
/*
+ * Use beginRead/CommitRead/beginWrite/commitWrite APIs
+ * to test wrap arounds are handled correctly.
+ * Write enough messages into the FMQ to fill half of it
+ * and read back the same.
+ * Write mNumMessagesMax messages into the queue. This will cause a
+ * wrap around. Read and verify the data.
+ */
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround2) {
+ size_t dataLen = mNumMessagesMax - 1;
+ std::vector<uint8_t> data(mNumMessagesMax);
+ std::vector<uint8_t> readData(mNumMessagesMax);
+ initData(&data[0], mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], dataLen));
+ ASSERT_TRUE(mQueue->read(&readData[0], dataLen));
+
+ /*
+ * The next write and read will have to deal with with wrap arounds.
+ */
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(mNumMessagesMax, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+ ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */, mNumMessagesMax));
+
+ ASSERT_TRUE(mQueue->commitWrite(mNumMessagesMax));
+
+ ASSERT_TRUE(mQueue->beginRead(mNumMessagesMax, &tx));
+
+ first = tx.getFirstRegion();
+ second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+ ASSERT_TRUE(tx.copyFrom(&readData[0], 0 /* startIdx */, mNumMessagesMax));
+ ASSERT_TRUE(mQueue->commitRead(mNumMessagesMax));
+
+ ASSERT_EQ(data, readData);
+}
+
+/*
* Verify that a few bytes of data can be successfully written and read.
*/
TEST_F(UnsynchronizedWrite, SmallInputTest1) {
const size_t dataLen = 16;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
ASSERT_TRUE(mQueue->read(readData, dataLen));
@@ -485,13 +653,11 @@
* Write the queue when full. Verify that a subsequent writes is succesful.
* Verify that availableToWrite() returns 0 as expected.
*/
-
-TEST_F(UnsynchronizedWrite, WriteWhenFull) {
+TEST_F(UnsynchronizedWrite, WriteWhenFull1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_TRUE(mQueue->write(&data[0], 1));
@@ -501,15 +667,36 @@
}
/*
+ * Write the queue when full. Verify that a subsequent writes
+ * using beginRead()/commitRead() is succesful.
+ * Verify that the next read fails as expected for unsynchronized flavor.
+ */
+TEST_F(UnsynchronizedWrite, WriteWhenFull2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ std::vector<uint8_t> data(mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+
+ MessageQueueUnsync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(1, &tx));
+
+ ASSERT_EQ(tx.getFirstRegion().getLength(), 1U);
+
+ ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */));
+
+ ASSERT_TRUE(mQueue->commitWrite(1));
+
+ std::vector<uint8_t> readData(mNumMessagesMax);
+ ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
+}
+
+/*
* Write a chunk of data equal to the queue size.
* Verify that the write is successful and the subsequent read
* returns the expected data.
*/
TEST_F(UnsynchronizedWrite, LargeInputTest1) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -526,9 +713,7 @@
const size_t dataLen = 4096;
ASSERT_GT(dataLen, mNumMessagesMax);
std::vector<uint8_t> data(dataLen);
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], dataLen);
ASSERT_FALSE(mQueue->write(&data[0], dataLen));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -543,9 +728,7 @@
*/
TEST_F(UnsynchronizedWrite, LargeInputTest3) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_TRUE(mQueue->write(&data[0], 1));
std::vector<uint8_t> readData(mNumMessagesMax);
@@ -561,9 +744,7 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
for (size_t i = 0; i < chunkNum; i++) {
@@ -581,12 +762,12 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
for (size_t i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
+
uint8_t readData[dataLen] = {};
ASSERT_TRUE(mQueue->read(readData, dataLen));
ASSERT_EQ(0, memcmp(readData, data, dataLen));
@@ -599,12 +780,11 @@
* wrap around. Read and verify the data.
*/
TEST_F(UnsynchronizedWrite, ReadWriteWrapAround) {
- size_t numMessages = mNumMessagesMax / 2;
+ size_t numMessages = mNumMessagesMax - 1;
std::vector<uint8_t> data(mNumMessagesMax);
std::vector<uint8_t> readData(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));