Add capability to do zero-copy read/writes to FMQ am: 8f0e8e55c7 am: 9bd8ad98fd am: f52bc1cc33
am: e76733d7a8
Change-Id: Ifac33ee19ab9b79b22450a4fc090aa7f52a299fb
diff --git a/include/fmq/MessageQueue.h b/include/fmq/MessageQueue.h
index b458f39..1f90cb1 100644
--- a/include/fmq/MessageQueue.h
+++ b/include/fmq/MessageQueue.h
@@ -218,23 +218,194 @@
* word will be unmapped by the MessageQueue destructor.
*/
std::atomic<uint32_t>* getEventFlagWord() const { return mEvFlagWord; }
-private:
- struct region {
- uint8_t* address;
+
+ /**
+ * Describes a memory region in the FMQ.
+ */
+ struct MemRegion {
+ MemRegion() : MemRegion(nullptr, 0) {}
+
+ MemRegion(T* base, size_t size) : address(base), length(size) {}
+
+ MemRegion& operator=(const MemRegion &other) {
+ address = other.address;
+ length = other.length;
+ return *this;
+ }
+
+ /**
+ * Gets a pointer to the base address of the MemRegion.
+ */
+ inline T* getAddress() const { return address; }
+
+ /**
+ * Gets the length of the MemRegion. This would equal to the number
+ * of items of type T that can be read from/written into the MemRegion.
+ */
+ inline size_t getLength() const { return length; }
+
+ /**
+ * Gets the length of the MemRegion in bytes.
+ */
+ inline size_t getLengthInBytes() const { return length * sizeof(T); }
+
+ private:
+ /* Base address */
+ T* address;
+
+ /*
+ * Number of items of type T that can be written to/read from the base
+ * address.
+ */
size_t length;
};
- struct transaction {
- region first;
- region second;
+
+ /**
+ * Describes the memory regions to be used for a read or write.
+ * The struct contains two MemRegion objects since the FMQ is a ring
+ * buffer and a read or write operation can wrap around. A single message
+ * of type T will never be broken between the two MemRegions.
+ */
+ struct MemTransaction {
+ MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
+
+ MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond) :
+ first(regionFirst), second(regionSecond) {}
+
+ MemTransaction& operator=(const MemTransaction &other) {
+ first = other.first;
+ second = other.second;
+ return *this;
+ }
+
+ /**
+ * Helper method to calculate the address for a particular index for
+ * the MemTransaction object.
+ *
+ * @param idx Index of the slot to be read/written. If the
+ * MemTransaction object is representing the memory region to read/write
+ * N items of type T, the valid range of idx is between 0 and N-1.
+ *
+ * @return Pointer to the slot idx. Will be nullptr for an invalid idx.
+ */
+ T* getSlot(size_t idx);
+
+ /**
+ * Helper method to write 'nMessages' items of type T into the memory
+ * regions described by the object starting from 'startIdx'. This method
+ * uses memcpy() and is not to meant to be used for a zero copy operation.
+ * Partial writes are not supported.
+ *
+ * @param data Pointer to the source buffer.
+ * @param nMessages Number of items of type T.
+ * @param startIdx The slot number to begin the write from. If the
+ * MemTransaction object is representing the memory region to read/write
+ * N items of type T, the valid range of startIdx is between 0 and N-1;
+ *
+ * @return Whether the write operation of size 'nMessages' succeeded.
+ */
+ bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
+
+ /*
+ * Helper method to read 'nMessages' items of type T from the memory
+ * regions described by the object starting from 'startIdx'. This method uses
+ * memcpy() and is not meant to be used for a zero copy operation. Partial reads
+ * are not supported.
+ *
+ * @param data Pointer to the destination buffer.
+ * @param nMessages Number of items of type T.
+ * @param startIdx The slot number to begin the read from. If the
+ * MemTransaction object is representing the memory region to read/write
+ * N items of type T, the valid range of startIdx is between 0 and N-1.
+ *
+ * @return Whether the read operation of size 'nMessages' succeeded.
+ */
+ bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
+
+ /**
+ * Returns a const reference to the first MemRegion in the
+ * MemTransaction object.
+ */
+ inline const MemRegion& getFirstRegion() const { return first; }
+
+ /**
+ * Returns a const reference to the second MemRegion in the
+ * MemTransaction object.
+ */
+ inline const MemRegion& getSecondRegion() const { return second; }
+
+ private:
+ /*
+ * Given a start index and the number of messages to be
+ * read/written, this helper method calculates the
+ * number of messages that should should be written to both the first
+ * and second MemRegions and the base addresses to be used for
+ * the read/write operation.
+ *
+ * Returns false if the 'startIdx' and 'nMessages' is
+ * invalid for the MemTransaction object.
+ */
+ bool inline getMemRegionInfo(size_t idx,
+ size_t nMessages,
+ size_t& firstCount,
+ size_t& secondCount,
+ T** firstBaseAddress,
+ T** secondBaseAddress);
+ MemRegion first;
+ MemRegion second;
};
- size_t writeBytes(const uint8_t* data, size_t size);
- transaction beginWrite(size_t nBytesDesired) const;
- void commitWrite(size_t nBytesWritten);
+ /**
+ * Get a MemTransaction object to write 'nMessages' items of type T.
+ * Once the write is performed using the information from MemTransaction,
+ * the write operation is to be committed using a call to commitWrite().
+ *
+ * @param nMessages Number of messages of type T.
+ * @param Pointer to MemTransaction struct that describes memory to write 'nMessages'
+ * items of type T. If a write of size 'nMessages' is not possible, the base
+ * addresses in the MemTransaction object would be set to nullptr.
+ *
+ * @return Whether it is possible to write 'nMessages' items of type T
+ * into the FMQ.
+ */
+ bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
- size_t readBytes(uint8_t* data, size_t size);
- transaction beginRead(size_t nBytesDesired) const;
- void commitRead(size_t nBytesRead);
+ /**
+ * Commit a write of size 'nMessages'. To be only used after a call to beginWrite().
+ *
+ * @param nMessages number of messages of type T to be written.
+ *
+ * @return Whether the write operation of size 'nMessages' succeeded.
+ */
+ bool commitWrite(size_t nMessages);
+
+ /**
+ * Get a MemTransaction object to read 'nMessages' items of type T.
+ * Once the read is performed using the information from MemTransaction,
+ * the read operation is to be committed using a call to commitRead().
+ *
+ * @param nMessages Number of messages of type T.
+ * @param pointer to MemTransaction struct that describes memory to read 'nMessages'
+ * items of type T. If a read of size 'nMessages' is not possible, the base
+ * pointers in the MemTransaction object returned will be set to nullptr.
+ *
+ * @return bool Whether it is possible to read 'nMessages' items of type T
+ * from the FMQ.
+ */
+ bool beginRead(size_t nMessages, MemTransaction* memTx) const;
+
+ /**
+ * Commit a read of size 'nMessages'. To be only used after a call to beginRead().
+ * For the unsynchronized flavor of FMQ, this method will return a failure
+ * if a write overflow happened after beginRead() was invoked.
+ *
+ * @param nMessages number of messages of type T to be read.
+ *
+ * @return bool Whether the read operation of size 'nMessages' succeeded.
+ */
+ bool commitRead(size_t nMessages);
+
+private:
size_t availableToWriteBytes() const;
size_t availableToReadBytes() const;
@@ -248,6 +419,10 @@
void initMemory(bool resetPointers);
enum DefaultEventNotification : uint32_t {
+ /*
+ * These are only used internally by the blockingRead()/blockingWrite()
+ * methods and hence once other bit combinations are not required.
+ */
FMQ_NOT_FULL = 0x01,
FMQ_NOT_EMPTY = 0x02
};
@@ -270,6 +445,131 @@
};
template <typename T, MQFlavor flavor>
+T* MessageQueue<T, flavor>::MemTransaction::getSlot(size_t idx) {
+ size_t firstRegionLength = first.getLength();
+ size_t secondRegionLength = second.getLength();
+
+ if (idx > firstRegionLength + secondRegionLength) {
+ return nullptr;
+ }
+
+ if (idx < firstRegionLength) {
+ return first.getAddress() + idx;
+ }
+
+ return second.getAddress() + idx - firstRegionLength;
+}
+
+template <typename T, MQFlavor flavor>
+bool MessageQueue<T, flavor>::MemTransaction::getMemRegionInfo(size_t startIdx,
+ size_t nMessages,
+ size_t& firstCount,
+ size_t& secondCount,
+ T** firstBaseAddress,
+ T** secondBaseAddress) {
+ size_t firstRegionLength = first.getLength();
+ size_t secondRegionLength = second.getLength();
+
+ if (startIdx + nMessages > firstRegionLength + secondRegionLength) {
+ /*
+ * Return false if 'nMessages' starting at 'startIdx' cannot be
+ * accomodated by the MemTransaction object.
+ */
+ return false;
+ }
+
+ /* Number of messages to be read/written to the first MemRegion. */
+ firstCount = startIdx < firstRegionLength ?
+ std::min(nMessages, firstRegionLength - startIdx) : 0;
+
+ /* Number of messages to be read/written to the second MemRegion. */
+ secondCount = nMessages - firstCount;
+
+ if (firstCount != 0) {
+ *firstBaseAddress = first.getAddress() + startIdx;
+ }
+
+ if (secondCount != 0) {
+ size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
+ *secondBaseAddress = second.getAddress() + secondStartIdx;
+ }
+
+ return true;
+}
+
+template <typename T, MQFlavor flavor>
+bool MessageQueue<T, flavor>::MemTransaction::copyFrom(T* data, size_t startIdx, size_t nMessages) {
+ if (data == nullptr) {
+ return false;
+ }
+
+ size_t firstReadCount = 0, secondReadCount = 0;
+ T* firstBaseAddress = nullptr, * secondBaseAddress = nullptr;
+
+ if (getMemRegionInfo(startIdx,
+ nMessages,
+ firstReadCount,
+ secondReadCount,
+ &firstBaseAddress,
+ &secondBaseAddress) == false) {
+ /*
+ * Returns false if 'startIdx' and 'nMessages' are invalid for this
+ * MemTransaction object.
+ */
+ return false;
+ }
+
+ if (firstReadCount != 0) {
+ memcpy(data, firstBaseAddress, firstReadCount * sizeof(T));
+ }
+
+ if (secondReadCount != 0) {
+ memcpy(data + firstReadCount,
+ secondBaseAddress,
+ secondReadCount * sizeof(T));
+ }
+
+ return true;
+}
+
+template <typename T, MQFlavor flavor>
+bool MessageQueue<T, flavor>::MemTransaction::copyTo(const T* data,
+ size_t startIdx,
+ size_t nMessages) {
+ if (data == nullptr) {
+ return false;
+ }
+
+ size_t firstWriteCount = 0, secondWriteCount = 0;
+ T * firstBaseAddress = nullptr, * secondBaseAddress = nullptr;
+
+ if (getMemRegionInfo(startIdx,
+ nMessages,
+ firstWriteCount,
+ secondWriteCount,
+ &firstBaseAddress,
+ &secondBaseAddress) == false) {
+ /*
+ * Returns false if 'startIdx' and 'nMessages' are invalid for this
+ * MemTransaction object.
+ */
+ return false;
+ }
+
+ if (firstWriteCount != 0) {
+ memcpy(firstBaseAddress, data, firstWriteCount * sizeof(T));
+ }
+
+ if (secondWriteCount != 0) {
+ memcpy(secondBaseAddress,
+ data + firstWriteCount,
+ secondWriteCount * sizeof(T));
+ }
+
+ return true;
+}
+
+template <typename T, MQFlavor flavor>
void MessageQueue<T, flavor>::initMemory(bool resetPointers) {
/*
* Verify that the the Descriptor contains the minimum number of grantors
@@ -405,17 +705,11 @@
}
template <typename T, MQFlavor flavor>
-bool MessageQueue<T, flavor>::write(const T* data, size_t count) {
- /*
- * If read/write synchronization is not enabled, data in the queue
- * will be overwritten by a write operation when full.
- */
- if ((flavor == kSynchronizedReadWrite && (availableToWriteBytes() < sizeof(T) * count)) ||
- (count > getQuantumCount()))
- return false;
-
- return (writeBytes(reinterpret_cast<const uint8_t*>(data),
- sizeof(T) * count) == sizeof(T) * count);
+bool MessageQueue<T, flavor>::write(const T* data, size_t nMessages) {
+ MemTransaction tx;
+ return beginWrite(nMessages, &tx) &&
+ tx.copyTo(data, 0 /* startIdx */, nMessages) &&
+ commitWrite(nMessages);
}
template <typename T, MQFlavor flavor>
@@ -630,9 +924,108 @@
}
template <typename T, MQFlavor flavor>
+size_t MessageQueue<T, flavor>::availableToWriteBytes() const {
+ return mDesc->getSize() - availableToReadBytes();
+}
+
+template <typename T, MQFlavor flavor>
+size_t MessageQueue<T, flavor>::availableToWrite() const {
+ return availableToWriteBytes() / sizeof(T);
+}
+
+template <typename T, MQFlavor flavor>
+size_t MessageQueue<T, flavor>::availableToRead() const {
+ return availableToReadBytes() / sizeof(T);
+}
+
+template <typename T, MQFlavor flavor>
+bool MessageQueue<T, flavor>::beginWrite(size_t nMessages, MemTransaction* result) const {
+ /*
+ * If nMessages is greater than size of FMQ or in case of the synchronized
+ * FMQ flavor, if there is not enough space to write nMessages, then return
+ * result with null addresses.
+ */
+ if ((flavor == kSynchronizedReadWrite && (availableToWrite() < nMessages)) ||
+ nMessages > getQuantumCount()) {
+ *result = MemTransaction();
+ return false;
+ }
+
+ auto writePtr = mWritePtr->load(std::memory_order_relaxed);
+ size_t writeOffset = writePtr % mDesc->getSize();
+
+ /*
+ * From writeOffset, the number of messages that can be written
+ * contiguously without wrapping around the ring buffer are calculated.
+ */
+ size_t contiguousMessages = (mDesc->getSize() - writeOffset) / sizeof(T);
+
+ if (contiguousMessages < nMessages) {
+ /*
+ * Wrap around is required. Both result.first and result.second are
+ * populated.
+ */
+ *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset),
+ contiguousMessages),
+ MemRegion(reinterpret_cast<T*>(mRing),
+ nMessages - contiguousMessages));
+ } else {
+ /*
+ * A wrap around is not required to write nMessages. Only result.first
+ * is populated.
+ */
+ *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
+ MemRegion());
+ }
+
+ return true;
+}
+
+template <typename T, MQFlavor flavor>
+/*
+ * Disable integer sanitization since integer overflow here is allowed
+ * and legal.
+ */
__attribute__((no_sanitize("integer")))
-bool MessageQueue<T, flavor>::read(T* data, size_t count) {
- if (availableToReadBytes() < sizeof(T) * count) return false;
+bool MessageQueue<T, flavor>::commitWrite(size_t nMessages) {
+ size_t nBytesWritten = nMessages * sizeof(T);
+ auto writePtr = mWritePtr->load(std::memory_order_relaxed);
+ writePtr += nBytesWritten;
+ mWritePtr->store(writePtr, std::memory_order_release);
+ /*
+ * This method cannot fail now since we are only incrementing the writePtr
+ * counter.
+ */
+ return true;
+}
+
+template <typename T, MQFlavor flavor>
+size_t MessageQueue<T, flavor>::availableToReadBytes() const {
+ /*
+ * This method is invoked by implementations of both read() and write() and
+ * hence requries a memory_order_acquired load for both mReadPtr and
+ * mWritePtr.
+ */
+ return mWritePtr->load(std::memory_order_acquire) -
+ mReadPtr->load(std::memory_order_acquire);
+}
+
+template <typename T, MQFlavor flavor>
+bool MessageQueue<T, flavor>::read(T* data, size_t nMessages) {
+ MemTransaction tx;
+ return beginRead(nMessages, &tx) &&
+ tx.copyFrom(data, 0 /* startIdx */, nMessages) &&
+ commitRead(nMessages);
+}
+
+template <typename T, MQFlavor flavor>
+/*
+ * Disable integer sanitization since integer overflow here is allowed
+ * and legal.
+ */
+__attribute__((no_sanitize("integer")))
+bool MessageQueue<T, flavor>::beginRead(size_t nMessages, MemTransaction* result) const {
+ *result = MemTransaction();
/*
* If it is detected that the data in the queue was overwritten
* due to the reader process being too slow, the read pointer counter
@@ -652,118 +1045,65 @@
return false;
}
- return readBytes(reinterpret_cast<uint8_t*>(data), sizeof(T) * count) ==
- sizeof(T) * count;
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::availableToWriteBytes() const {
- return mDesc->getSize() - availableToReadBytes();
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::availableToWrite() const {
- return availableToWriteBytes()/sizeof(T);
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::availableToRead() const {
- return availableToReadBytes()/sizeof(T);
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::writeBytes(const uint8_t* data, size_t size) {
- transaction tx = beginWrite(size);
- memcpy(tx.first.address, data, tx.first.length);
- memcpy(tx.second.address, data + tx.first.length, tx.second.length);
- size_t result = tx.first.length + tx.second.length;
- commitWrite(result);
- return result;
-}
-
-/*
- * The below method does not check for available space since it was already
- * checked by write() API which invokes writeBytes() which in turn calls
- * beginWrite().
- */
-template <typename T, MQFlavor flavor>
-typename MessageQueue<T, flavor>::transaction MessageQueue<T, flavor>::beginWrite(
- size_t nBytesDesired) const {
- transaction result;
- auto writePtr = mWritePtr->load(std::memory_order_relaxed);
- size_t writeOffset = writePtr % mDesc->getSize();
- size_t contiguous = mDesc->getSize() - writeOffset;
- if (contiguous < nBytesDesired) {
- result = {{mRing + writeOffset, contiguous},
- {mRing, nBytesDesired - contiguous}};
- } else {
- result = {
- {mRing + writeOffset, nBytesDesired}, {0, 0},
- };
- }
- return result;
-}
-
-template <typename T, MQFlavor flavor>
-__attribute__((no_sanitize("integer")))
-void MessageQueue<T, flavor>::commitWrite(size_t nBytesWritten) {
- auto writePtr = mWritePtr->load(std::memory_order_relaxed);
- writePtr += nBytesWritten;
- mWritePtr->store(writePtr, std::memory_order_release);
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::availableToReadBytes() const {
+ size_t nBytesDesired = nMessages * sizeof(T);
/*
- * This method is invoked by implementations of both read() and write() and
- * hence requries a memory_order_acquired load for both mReadPtr and
- * mWritePtr.
+ * Return if insufficient data to read in FMQ.
*/
- return mWritePtr->load(std::memory_order_acquire) -
- mReadPtr->load(std::memory_order_acquire);
-}
-
-template <typename T, MQFlavor flavor>
-size_t MessageQueue<T, flavor>::readBytes(uint8_t* data, size_t size) {
- transaction tx = beginRead(size);
- memcpy(data, tx.first.address, tx.first.length);
- memcpy(data + tx.first.length, tx.second.address, tx.second.length);
- size_t result = tx.first.length + tx.second.length;
- commitRead(result);
- return result;
-}
-
-/*
- * The below method does not check whether nBytesDesired bytes are available
- * to read because the check is performed in the read() method before
- * readBytes() is invoked.
- */
-template <typename T, MQFlavor flavor>
-typename MessageQueue<T, flavor>::transaction MessageQueue<T, flavor>::beginRead(
- size_t nBytesDesired) const {
- transaction result;
- auto readPtr = mReadPtr->load(std::memory_order_relaxed);
- size_t readOffset = readPtr % mDesc->getSize();
- size_t contiguous = mDesc->getSize() - readOffset;
-
- if (contiguous < nBytesDesired) {
- result = {{mRing + readOffset, contiguous},
- {mRing, nBytesDesired - contiguous}};
- } else {
- result = {
- {mRing + readOffset, nBytesDesired}, {0, 0},
- };
+ if (writePtr - readPtr < nBytesDesired) {
+ return false;
}
- return result;
+ size_t readOffset = readPtr % mDesc->getSize();
+ /*
+ * From readOffset, the number of messages that can be read contiguously
+ * without wrapping around the ring buffer are calculated.
+ */
+ size_t contiguousMessages = (mDesc->getSize() - readOffset) / sizeof(T);
+
+ if (contiguousMessages < nMessages) {
+ /*
+ * A wrap around is required. Both result.first and result.second
+ * are populated.
+ */
+ *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset),
+ contiguousMessages),
+ MemRegion(reinterpret_cast<T*>(mRing),
+ nMessages - contiguousMessages));
+ } else {
+ /*
+ * A wrap around is not required. Only result.first need to be
+ * populated.
+ */
+ *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
+ MemRegion());
+ }
+
+ return true;
}
template <typename T, MQFlavor flavor>
+/*
+ * Disable integer sanitization since integer overflow here is allowed
+ * and legal.
+ */
__attribute__((no_sanitize("integer")))
-void MessageQueue<T, flavor>::commitRead(size_t nBytesRead) {
+bool MessageQueue<T, flavor>::commitRead(size_t nMessages) {
+ // TODO: Use a local copy of readPtr to avoid relazed mReadPtr loads.
auto readPtr = mReadPtr->load(std::memory_order_relaxed);
+ auto writePtr = mWritePtr->load(std::memory_order_acquire);
+ /*
+ * If the flavor is unsynchronized, it is possible that a write overflow may
+ * have occured between beginRead() and commitRead().
+ */
+ if (writePtr - readPtr > mDesc->getSize()) {
+ mReadPtr->store(writePtr, std::memory_order_release);
+ return false;
+ }
+
+ size_t nBytesRead = nMessages * sizeof(T);
readPtr += nBytesRead;
mReadPtr->store(readPtr, std::memory_order_release);
+ return true;
}
template <typename T, MQFlavor flavor>
diff --git a/tests/Android.mk b/tests/Android.mk
index 775716b..2e61d15 100644
--- a/tests/Android.mk
+++ b/tests/Android.mk
@@ -29,7 +29,7 @@
libcutils \
libutils \
libfmq
-
+LOCAL_CFLAGS := -Wall -Werror
LOCAL_SHARED_LIBRARIES += android.hardware.tests.msgq@1.0
include $(BUILD_EXECUTABLE)
@@ -46,7 +46,7 @@
libbase \
libfmq \
liblog
-
+LOCAL_CFLAGS := -Wall -Werror
LOCAL_SHARED_LIBRARIES += android.hardware.tests.msgq@1.0 libfmq
LOCAL_MODULE := mq_test_client
include $(BUILD_NATIVE_TEST)
@@ -62,5 +62,6 @@
libbase \
libfmq
LOCAL_MODULE := mq_test
+LOCAL_CFLAGS := -Wall -Werror
include $(BUILD_NATIVE_TEST)
diff --git a/tests/mq_test.cpp b/tests/mq_test.cpp
index 3748cfe..2f03904 100644
--- a/tests/mq_test.cpp
+++ b/tests/mq_test.cpp
@@ -28,6 +28,11 @@
kFmqNotFull = 1 << 1,
};
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>
+ MessageQueueSync;
+typedef android::hardware::MessageQueue<uint8_t, android::hardware::kUnsynchronizedWrite>
+ MessageQueueUnsync;
+
class SynchronizedReadWrites : public ::testing::Test {
protected:
virtual void TearDown() {
@@ -36,16 +41,14 @@
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
- android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>*
- mQueue = nullptr;
+ MessageQueueSync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -57,16 +60,14 @@
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<uint8_t,
- android::hardware::kUnsynchronizedWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueUnsync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
ASSERT_EQ(kNumElementsInQueue, mNumMessagesMax);
}
- android::hardware::MessageQueue<uint8_t,
- android::hardware::kUnsynchronizedWrite>* mQueue = nullptr;
+ MessageQueueUnsync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -77,8 +78,7 @@
}
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2048;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<
- uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
@@ -89,7 +89,7 @@
std::atomic_init(&mFw, static_cast<uint32_t>(kFmqNotFull));
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+ MessageQueueSync* mQueue;
std::atomic<uint32_t> mFw;
size_t mNumMessagesMax = 0;
};
@@ -101,9 +101,8 @@
}
virtual void SetUp() {
static constexpr size_t kNumElementsInQueue = 2049;
- mQueue = new (std::nothrow) android::hardware::MessageQueue<
- uint8_t, android::hardware::kSynchronizedReadWrite>(kNumElementsInQueue,
- true /* configureEventFlagWord */);
+ mQueue = new (std::nothrow) MessageQueueSync(kNumElementsInQueue,
+ true /* configureEventFlagWord */);
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
mNumMessagesMax = mQueue->getQuantumCount();
@@ -116,11 +115,20 @@
std::atomic_init(evFlagWordPtr, static_cast<uint32_t>(kFmqNotFull));
}
- android::hardware::MessageQueue<uint8_t, android::hardware::kSynchronizedReadWrite>* mQueue;
+ MessageQueueSync* mQueue;
size_t mNumMessagesMax = 0;
};
/*
+ * Utility function to initialize data to be written to the FMQ
+ */
+inline void initData(uint8_t* data, size_t count) {
+ for (size_t i = 0; i < count; i++) {
+ data[i] = i & 0xFF;
+ }
+}
+
+/*
* This thread will attempt to read and block. When wait returns
* it checks if the kFmqNotEmpty bit is actually set.
* If the read is succesful, it signals Wake to kFmqNotFull.
@@ -297,9 +305,7 @@
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
@@ -308,9 +314,81 @@
}
/*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest2) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ uint8_t data[dataLen];
+
+ initData(data, dataLen);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+ ASSERT_TRUE(tx.copyTo(data, 0 /* startIdx */, dataLen));
+
+ ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+ uint8_t readData[dataLen] = {};
+
+ ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+ ASSERT_TRUE(tx.copyFrom(readData, 0 /* startIdx */, dataLen));
+
+ ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+ ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
+ * Verify that a few bytes of data can be successfully written and read using
+ * beginRead/beginWrite/CommitRead/CommitWrite as well as getSlot().
+ */
+TEST_F(SynchronizedReadWrites, SmallInputTest3) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ uint8_t data[dataLen];
+
+ initData(data, dataLen);
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), dataLen);
+ for (size_t i = 0; i < dataLen; i++) {
+ uint8_t* ptr = tx.getSlot(i);
+ *ptr = data[i];
+ }
+
+ ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+ uint8_t readData[dataLen] = {};
+
+ ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+ first = tx.getFirstRegion();
+ second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), dataLen);
+
+ for (size_t i = 0; i < dataLen; i++) {
+ uint8_t* ptr = tx.getSlot(i);
+ readData[i] = *ptr;
+ }
+
+ ASSERT_TRUE(mQueue->commitRead(dataLen));
+
+ ASSERT_EQ(0, memcmp(data, readData, dataLen));
+}
+
+/*
* Verify that read() returns false when trying to read from an empty queue.
*/
-TEST_F(SynchronizedReadWrites, ReadWhenEmpty) {
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
const size_t dataLen = 2;
ASSERT_LE(dataLen, mNumMessagesMax);
@@ -319,18 +397,33 @@
}
/*
+ * Verify that beginRead() returns a MemTransaction object with null pointers when trying
+ * to read from an empty queue.
+ */
+TEST_F(SynchronizedReadWrites, ReadWhenEmpty2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ const size_t dataLen = 2;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginRead(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Write the queue until full. Verify that another write is unsuccessful.
* Verify that availableToWrite() returns 0 as expected.
*/
-
-TEST_F(SynchronizedReadWrites, WriteWhenFull) {
+TEST_F(SynchronizedReadWrites, WriteWhenFull1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_FALSE(mQueue->write(&data[0], 1));
@@ -341,16 +434,35 @@
}
/*
+ * Write the queue until full. Verify that beginWrite() returns
+ * a MemTransaction object with null base pointers.
+ */
+TEST_F(SynchronizedReadWrites, WriteWhenFull2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ std::vector<uint8_t> data(mNumMessagesMax);
+
+ initData(&data[0], mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+ ASSERT_EQ(0UL, mQueue->availableToWrite());
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginWrite(1, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Write a chunk of data equal to the queue size.
* Verify that the write is successful and the subsequent read
* returns the expected data.
*/
TEST_F(SynchronizedReadWrites, LargeInputTest1) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -367,9 +479,8 @@
const size_t dataLen = 4096;
ASSERT_GT(dataLen, mNumMessagesMax);
std::vector<uint8_t> data(dataLen);
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], dataLen);
ASSERT_FALSE(mQueue->write(&data[0], dataLen));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -384,9 +495,7 @@
*/
TEST_F(SynchronizedReadWrites, LargeInputTest3) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_FALSE(mQueue->write(&data[0], 1));
std::vector<uint8_t> readData(mNumMessagesMax);
@@ -395,6 +504,26 @@
}
/*
+ * Verify that beginWrite() returns a MemTransaction with
+ * null base pointers when attempting to write data larger
+ * than the queue size.
+ */
+TEST_F(SynchronizedReadWrites, LargeInputTest4) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ const size_t dataLen = 4096;
+ ASSERT_GT(dataLen, mNumMessagesMax);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_FALSE(mQueue->beginWrite(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(nullptr, first.getAddress());
+ ASSERT_EQ(nullptr, second.getAddress());
+}
+
+/*
* Verify that multiple reads one after the other return expected data.
*/
TEST_F(SynchronizedReadWrites, MultipleRead) {
@@ -403,9 +532,8 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
for (size_t i = 0; i < chunkNum; i++) {
@@ -423,9 +551,8 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
for (unsigned int i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
@@ -440,13 +567,11 @@
* Write mNumMessagesMax messages into the queue. This will cause a
* wrap around. Read and verify the data.
*/
-TEST_F(SynchronizedReadWrites, ReadWriteWrapAround) {
- size_t numMessages = mNumMessagesMax / 2;
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround1) {
+ size_t numMessages = mNumMessagesMax - 1;
std::vector<uint8_t> data(mNumMessagesMax);
std::vector<uint8_t> readData(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
@@ -455,15 +580,58 @@
}
/*
+ * Use beginRead/CommitRead/beginWrite/commitWrite APIs
+ * to test wrap arounds are handled correctly.
+ * Write enough messages into the FMQ to fill half of it
+ * and read back the same.
+ * Write mNumMessagesMax messages into the queue. This will cause a
+ * wrap around. Read and verify the data.
+ */
+TEST_F(SynchronizedReadWrites, ReadWriteWrapAround2) {
+ size_t dataLen = mNumMessagesMax - 1;
+ std::vector<uint8_t> data(mNumMessagesMax);
+ std::vector<uint8_t> readData(mNumMessagesMax);
+ initData(&data[0], mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], dataLen));
+ ASSERT_TRUE(mQueue->read(&readData[0], dataLen));
+
+ /*
+ * The next write and read will have to deal with with wrap arounds.
+ */
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(mNumMessagesMax, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+ ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */, mNumMessagesMax));
+
+ ASSERT_TRUE(mQueue->commitWrite(mNumMessagesMax));
+
+ ASSERT_TRUE(mQueue->beginRead(mNumMessagesMax, &tx));
+
+ first = tx.getFirstRegion();
+ second = tx.getSecondRegion();
+
+ ASSERT_EQ(first.getLength() + second.getLength(), mNumMessagesMax);
+
+ ASSERT_TRUE(tx.copyFrom(&readData[0], 0 /* startIdx */, mNumMessagesMax));
+ ASSERT_TRUE(mQueue->commitRead(mNumMessagesMax));
+
+ ASSERT_EQ(data, readData);
+}
+
+/*
* Verify that a few bytes of data can be successfully written and read.
*/
TEST_F(UnsynchronizedWrite, SmallInputTest1) {
const size_t dataLen = 16;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
ASSERT_TRUE(mQueue->read(readData, dataLen));
@@ -485,13 +653,11 @@
* Write the queue when full. Verify that a subsequent writes is succesful.
* Verify that availableToWrite() returns 0 as expected.
*/
-
-TEST_F(UnsynchronizedWrite, WriteWhenFull) {
+TEST_F(UnsynchronizedWrite, WriteWhenFull1) {
ASSERT_EQ(0UL, mQueue->availableToRead());
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_TRUE(mQueue->write(&data[0], 1));
@@ -501,15 +667,36 @@
}
/*
+ * Write the queue when full. Verify that a subsequent writes
+ * using beginRead()/commitRead() is succesful.
+ * Verify that the next read fails as expected for unsynchronized flavor.
+ */
+TEST_F(UnsynchronizedWrite, WriteWhenFull2) {
+ ASSERT_EQ(0UL, mQueue->availableToRead());
+ std::vector<uint8_t> data(mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
+
+ MessageQueueUnsync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(1, &tx));
+
+ ASSERT_EQ(tx.getFirstRegion().getLength(), 1U);
+
+ ASSERT_TRUE(tx.copyTo(&data[0], 0 /* startIdx */));
+
+ ASSERT_TRUE(mQueue->commitWrite(1));
+
+ std::vector<uint8_t> readData(mNumMessagesMax);
+ ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
+}
+
+/*
* Write a chunk of data equal to the queue size.
* Verify that the write is successful and the subsequent read
* returns the expected data.
*/
TEST_F(UnsynchronizedWrite, LargeInputTest1) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_TRUE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -526,9 +713,7 @@
const size_t dataLen = 4096;
ASSERT_GT(dataLen, mNumMessagesMax);
std::vector<uint8_t> data(dataLen);
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], dataLen);
ASSERT_FALSE(mQueue->write(&data[0], dataLen));
std::vector<uint8_t> readData(mNumMessagesMax);
ASSERT_FALSE(mQueue->read(&readData[0], mNumMessagesMax));
@@ -543,9 +728,7 @@
*/
TEST_F(UnsynchronizedWrite, LargeInputTest3) {
std::vector<uint8_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_TRUE(mQueue->write(&data[0], 1));
std::vector<uint8_t> readData(mNumMessagesMax);
@@ -561,9 +744,7 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
uint8_t readData[dataLen] = {};
for (size_t i = 0; i < chunkNum; i++) {
@@ -581,12 +762,12 @@
const size_t dataLen = chunkSize * chunkNum;
ASSERT_LE(dataLen, mNumMessagesMax);
uint8_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(data, dataLen);
for (size_t i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
+
uint8_t readData[dataLen] = {};
ASSERT_TRUE(mQueue->read(readData, dataLen));
ASSERT_EQ(0, memcmp(readData, data, dataLen));
@@ -599,12 +780,11 @@
* wrap around. Read and verify the data.
*/
TEST_F(UnsynchronizedWrite, ReadWriteWrapAround) {
- size_t numMessages = mNumMessagesMax / 2;
+ size_t numMessages = mNumMessagesMax - 1;
std::vector<uint8_t> data(mNumMessagesMax);
std::vector<uint8_t> readData(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i & 0xFF;
- }
+
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
ASSERT_TRUE(mQueue->read(&readData[0], numMessages));
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
diff --git a/tests/msgq_test_client.cpp b/tests/msgq_test_client.cpp
index 35a6fd7..0a891c6 100644
--- a/tests/msgq_test_client.cpp
+++ b/tests/msgq_test_client.cpp
@@ -50,6 +50,9 @@
} // namespace hardware
} // namespace android
+typedef MessageQueue<uint16_t, kSynchronizedReadWrite> MessageQueueSync;
+typedef MessageQueue<uint16_t, kUnsynchronizedWrite> MessageQueueUnsync;
+
class SynchronizedReadWriteClient : public ::testing::Test {
protected:
virtual void TearDown() {
@@ -64,7 +67,7 @@
mService->configureFmqSyncReadWrite([this](
bool ret, const MQDescriptorSync<uint16_t>& in) {
ASSERT_TRUE(ret);
- mQueue = new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>(in);
+ mQueue = new (std::nothrow) MessageQueueSync(in);
});
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
@@ -72,7 +75,7 @@
}
sp<ITestMsgQ> mService;
- MessageQueue<uint16_t, kSynchronizedReadWrite>* mQueue = nullptr;
+ MessageQueueSync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -90,7 +93,7 @@
mService->configureFmqUnsyncWrite(
[this](bool ret, const MQDescriptorUnsync<uint16_t>& in) {
ASSERT_TRUE(ret);
- mQueue = new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>(in);
+ mQueue = new (std::nothrow) MessageQueueUnsync(in);
});
ASSERT_NE(nullptr, mQueue);
ASSERT_TRUE(mQueue->isValid());
@@ -98,7 +101,7 @@
}
sp<ITestMsgQ> mService;
- MessageQueue<uint16_t, kUnsynchronizedWrite>* mQueue = nullptr;
+ MessageQueueUnsync* mQueue = nullptr;
size_t mNumMessagesMax = 0;
};
@@ -113,6 +116,15 @@
}
/*
+ * Utility function to initialize data to be written to the FMQ
+ */
+inline void initData(uint16_t* data, size_t count) {
+ for (size_t i = 0; i < count; i++) {
+ data[i] = i;
+ }
+}
+
+/*
* Test that basic blocking works using readBlocking()/writeBlocking() APIs
* using the EventFlag object owned by FMQ.
*/
@@ -264,7 +276,8 @@
ASSERT_TRUE(ret);
}
-/* Request mService to write a small number of messages
+/*
+ * Request mService to write a small number of messages
* to the FMQ. Read and verify data.
*/
TEST_F(SynchronizedReadWriteClient, SmallInputReaderTest1) {
@@ -278,6 +291,37 @@
}
/*
+ * Request mService to write a small number of messages
+ * to the FMQ. Read and verify each message using
+ * beginRead/Commit read APIs.
+ */
+TEST_F(SynchronizedReadWriteClient, SmallInputReaderTest2) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ auto ret = mService->requestWriteFmqSync(dataLen);
+
+ ASSERT_TRUE(ret.isOk());
+ ASSERT_TRUE(ret);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginRead(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+ size_t firstRegionLength = first.getLength();
+
+ for (size_t i = 0; i < dataLen; i++) {
+ if (i < firstRegionLength) {
+ ASSERT_EQ(i, *(first.getAddress() + i));
+ } else {
+ ASSERT_EQ(i, *(second.getAddress() + i - firstRegionLength));
+ }
+ }
+
+ ASSERT_TRUE(mQueue->commitRead(dataLen));
+}
+
+/*
* Write a small number of messages to FMQ. Request
* mService to read and verify that the write was succesful.
*/
@@ -286,9 +330,7 @@
ASSERT_LE(dataLen, mNumMessagesMax);
size_t originalCount = mQueue->availableToWrite();
uint16_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
bool ret = mService->requestReadFmqSync(dataLen);
ASSERT_TRUE(ret);
@@ -297,6 +339,44 @@
}
/*
+ * Write a small number of messages to FMQ using the beginWrite()/CommitWrite()
+ * APIs. Request mService to read and verify that the write was succesful.
+ */
+TEST_F(SynchronizedReadWriteClient, SmallInputWriterTest2) {
+ const size_t dataLen = 16;
+ ASSERT_LE(dataLen, mNumMessagesMax);
+ size_t originalCount = mQueue->availableToWrite();
+ uint16_t data[dataLen];
+ initData(data, dataLen);
+
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(dataLen, &tx));
+
+ auto first = tx.getFirstRegion();
+ auto second = tx.getSecondRegion();
+
+ size_t firstRegionLength = first.getLength();
+ uint16_t* firstBaseAddress = first.getAddress();
+ uint16_t* secondBaseAddress = second.getAddress();
+
+ for (size_t i = 0; i < dataLen; i++) {
+ if (i < firstRegionLength) {
+ *(firstBaseAddress + i) = i;
+ } else {
+ *(secondBaseAddress + i - firstRegionLength) = i;
+ }
+ }
+
+ ASSERT_TRUE(mQueue->commitWrite(dataLen));
+
+ auto ret = mService->requestReadFmqSync(dataLen);
+ ASSERT_TRUE(ret.isOk());
+ ASSERT_TRUE(ret);
+ size_t availableCount = mQueue->availableToWrite();
+ ASSERT_EQ(originalCount, availableCount);
+}
+
+/*
* Verify that the FMQ is empty and read fails when it is empty.
*/
TEST_F(SynchronizedReadWriteClient, ReadWhenEmpty) {
@@ -318,9 +398,7 @@
TEST_F(SynchronizedReadWriteClient, WriteWhenFull) {
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_FALSE(mQueue->write(&data[0], 1));
@@ -367,10 +445,7 @@
TEST_F(SynchronizedReadWriteClient, LargeInputTest3) {
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_FALSE(mQueue->write(&data[0], 1));
@@ -410,9 +485,8 @@
const size_t numMessages = chunkSize * chunkNum;
ASSERT_LE(numMessages, mNumMessagesMax);
uint16_t data[numMessages];
- for (size_t i = 0; i < numMessages; i++) {
- data[i] = i;
- }
+ initData(&data[0], numMessages);
+
for (size_t i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
@@ -429,9 +503,7 @@
TEST_F(SynchronizedReadWriteClient, ReadWriteWrapAround) {
size_t numMessages = mNumMessagesMax / 2;
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
bool ret = mService->requestReadFmqSync(numMessages);
ASSERT_TRUE(ret);
@@ -440,7 +512,47 @@
ASSERT_TRUE(ret);
}
-/* Request mService to write a small number of messages
+/*
+ * Use beginWrite/commitWrite/getSlot APIs to test wrap arounds are handled
+ * correctly.
+ * Write enough messages into the FMQ to fill half of it
+ * and read back the same.
+ * Write mNumMessagesMax messages into the queue. This will cause a
+ * wrap around. Read and verify the data.
+ */
+TEST_F(SynchronizedReadWriteClient, ReadWriteWrapAround2) {
+ size_t numMessages = mNumMessagesMax / 2;
+ std::vector<uint16_t> data(mNumMessagesMax);
+ initData(&data[0], mNumMessagesMax);
+ ASSERT_TRUE(mQueue->write(&data[0], numMessages));
+ auto ret = mService->requestReadFmqSync(numMessages);
+
+ ASSERT_TRUE(ret.isOk());
+ ASSERT_TRUE(ret);
+
+ /*
+ * The next write and read will have to deal with with wrap arounds.
+ */
+ MessageQueueSync::MemTransaction tx;
+ ASSERT_TRUE(mQueue->beginWrite(mNumMessagesMax, &tx));
+
+ ASSERT_EQ(tx.getFirstRegion().getLength() + tx.getSecondRegion().getLength(), mNumMessagesMax);
+
+ for (size_t i = 0; i < mNumMessagesMax; i++) {
+ uint16_t* ptr = tx.getSlot(i);
+ *ptr = data[i];
+ }
+
+ ASSERT_TRUE(mQueue->commitWrite(mNumMessagesMax));
+
+ ret = mService->requestReadFmqSync(mNumMessagesMax);
+
+ ASSERT_TRUE(ret.isOk());
+ ASSERT_TRUE(ret);
+}
+
+/*
+ * Request mService to write a small number of messages
* to the FMQ. Read and verify data.
*/
TEST_F(UnsynchronizedWriteClient, SmallInputReaderTest1) {
@@ -460,11 +572,8 @@
TEST_F(UnsynchronizedWriteClient, SmallInputWriterTest1) {
const size_t dataLen = 16;
ASSERT_LE(dataLen, mNumMessagesMax);
- size_t originalCount = mQueue->availableToWrite();
uint16_t data[dataLen];
- for (size_t i = 0; i < dataLen; i++) {
- data[i] = i;
- }
+ initData(data, dataLen);
ASSERT_TRUE(mQueue->write(data, dataLen));
bool ret = mService->requestReadFmqUnsync(dataLen);
ASSERT_TRUE(ret);
@@ -492,9 +601,7 @@
TEST_F(UnsynchronizedWriteClient, WriteWhenFull) {
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_TRUE(mQueue->write(&data[0], 1));
@@ -541,10 +648,7 @@
*/
TEST_F(UnsynchronizedWriteClient, LargeInputTest3) {
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
-
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], mNumMessagesMax));
ASSERT_EQ(0UL, mQueue->availableToWrite());
ASSERT_TRUE(mQueue->write(&data[0], 1));
@@ -588,9 +692,7 @@
const size_t numMessages = chunkSize * chunkNum;
ASSERT_LE(numMessages, mNumMessagesMax);
uint16_t data[numMessages];
- for (size_t i = 0; i < numMessages; i++) {
- data[i] = i;
- }
+ initData(data, numMessages);
for (size_t i = 0; i < chunkNum; i++) {
ASSERT_TRUE(mQueue->write(data + i * chunkSize, chunkSize));
}
@@ -607,9 +709,7 @@
TEST_F(UnsynchronizedWriteClient, ReadWriteWrapAround) {
size_t numMessages = mNumMessagesMax / 2;
std::vector<uint16_t> data(mNumMessagesMax);
- for (size_t i = 0; i < mNumMessagesMax; i++) {
- data[i] = i;
- }
+ initData(&data[0], mNumMessagesMax);
ASSERT_TRUE(mQueue->write(&data[0], numMessages));
bool ret = mService->requestReadFmqUnsync(numMessages);
ASSERT_TRUE(ret);