Merge "Update Stream Manager with stream Id"
diff --git a/computepipe/runner/include/MemHandle.h b/computepipe/runner/include/MemHandle.h
index 19c5c73..bd9198d 100644
--- a/computepipe/runner/include/MemHandle.h
+++ b/computepipe/runner/include/MemHandle.h
@@ -24,16 +24,18 @@
class MemHandle {
public:
+ /* Retrieve stream Id */
+ virtual int getStreamId() const = 0;
/* Retrieve packet type */
- virtual proto::PacketType getType() = 0;
+ virtual proto::PacketType getType() const = 0;
/* Retrieve packet time stamp */
- virtual uint64_t getTimeStamp() = 0;
+ virtual uint64_t getTimeStamp() const = 0;
/* Get size */
- virtual uint32_t getSize() = 0;
+ virtual uint32_t getSize() const = 0;
/* Get data, raw pointer. Only implemented for copy semantics */
- virtual const char* getData() = 0;
+ virtual const char* getData() const = 0;
/* Get native handle. data with zero copy semantics */
- virtual native_handle_t getNativeHandle() = 0;
+ virtual native_handle_t getNativeHandle() const = 0;
virtual ~MemHandle() {
}
diff --git a/computepipe/runner/stream_manager/Factory.cpp b/computepipe/runner/stream_manager/Factory.cpp
index 9a82028..d77016a 100644
--- a/computepipe/runner/stream_manager/Factory.cpp
+++ b/computepipe/runner/stream_manager/Factory.cpp
@@ -31,7 +31,7 @@
const proto::OutputConfig& config, std::function<Status(std::shared_ptr<MemHandle>)>& cb,
uint32_t maxPackets) {
std::unique_ptr<SemanticManager> semanticManager =
- std::make_unique<SemanticManager>(config.stream_name(), config.type());
+ std::make_unique<SemanticManager>(config.stream_name(), config.stream_id(), config.type());
if (semanticManager->setIpcDispatchCallback(cb) != SUCCESS) {
return nullptr;
}
diff --git a/computepipe/runner/stream_manager/SemanticManager.cpp b/computepipe/runner/stream_manager/SemanticManager.cpp
index 2f57dbf..a874b96 100644
--- a/computepipe/runner/stream_manager/SemanticManager.cpp
+++ b/computepipe/runner/stream_manager/SemanticManager.cpp
@@ -10,34 +10,39 @@
namespace runner {
namespace stream_manager {
-proto::PacketType SemanticHandle::getType() {
+proto::PacketType SemanticHandle::getType() const {
return mType;
}
-uint64_t SemanticHandle::getTimeStamp() {
+uint64_t SemanticHandle::getTimeStamp() const {
return mTimestamp;
}
-uint32_t SemanticHandle::getSize() {
+uint32_t SemanticHandle::getSize() const {
return mSize;
}
-const char* SemanticHandle::getData() {
+const char* SemanticHandle::getData() const {
return mData;
}
-native_handle_t SemanticHandle::getNativeHandle() {
+native_handle_t SemanticHandle::getNativeHandle() const {
native_handle_t temp;
temp.numFds = 0;
temp.numInts = 0;
return temp;
}
-Status SemanticHandle::setMemInfo(const char* data, uint32_t size, uint64_t timestamp,
+int SemanticHandle::getStreamId() const {
+ return mStreamId;
+}
+
+Status SemanticHandle::setMemInfo(int streamId, const char* data, uint32_t size, uint64_t timestamp,
const proto::PacketType& type) {
if (data == nullptr || size == 0 || size > kMaxSemanticDataSize) {
return INVALID_ARGUMENT;
}
+ mStreamId = streamId;
mData = (char*)malloc(size);
if (!mData) {
return NO_MEMORY;
@@ -138,7 +143,7 @@
return INTERNAL_ERROR;
}
auto memHandle = std::make_shared<SemanticHandle>();
- auto status = memHandle->setMemInfo(data, size, timestamp, mType);
+ auto status = memHandle->setMemInfo(mStreamId, data, size, timestamp, mType);
if (status != SUCCESS) {
return status;
}
@@ -146,8 +151,8 @@
return SUCCESS;
}
-SemanticManager::SemanticManager(std::string name, const proto::PacketType& type)
- : StreamManager(name, type) {
+SemanticManager::SemanticManager(std::string name, int streamId, const proto::PacketType& type)
+ : StreamManager(name, type), mStreamId(streamId) {
}
} // namespace stream_manager
} // namespace runner
diff --git a/computepipe/runner/stream_manager/SemanticManager.h b/computepipe/runner/stream_manager/SemanticManager.h
index c738718..687e0b9 100644
--- a/computepipe/runner/stream_manager/SemanticManager.h
+++ b/computepipe/runner/stream_manager/SemanticManager.h
@@ -31,17 +31,17 @@
class SemanticHandle : public MemHandle {
public:
static constexpr uint32_t kMaxSemanticDataSize = 1024;
- proto::PacketType getType() override;
- /* Retrieve packet time stamp */
- uint64_t getTimeStamp() override;
- /* Get size */
- uint32_t getSize() override;
- /* Get data, raw pointer. Only implemented for copy semantics */
- const char* getData() override;
- /* Get native handle. data with zero copy semantics */
- native_handle_t getNativeHandle() override;
+ /**
+ * Override mem handle methods
+ */
+ int getStreamId() const override;
+ proto::PacketType getType() const override;
+ uint64_t getTimeStamp() const override;
+ uint32_t getSize() const override;
+ const char* getData() const override;
+ native_handle_t getNativeHandle() const override;
/* set info for the memory. Make a copy */
- Status setMemInfo(const char* data, uint32_t size, uint64_t timestamp,
+ Status setMemInfo(int streamId, const char* data, uint32_t size, uint64_t timestamp,
const proto::PacketType& type);
/* Destroy local copy */
~SemanticHandle();
@@ -51,6 +51,7 @@
uint32_t mSize;
uint64_t mTimestamp;
proto::PacketType mType;
+ int mStreamId;
};
class SemanticManager : public StreamManager, StreamManagerInit {
@@ -69,11 +70,12 @@
Status handleStopWithFlushPhase(const RunnerEvent& e) override;
Status handleStopImmediatePhase(const RunnerEvent& e) override;
- explicit SemanticManager(std::string name, const proto::PacketType& type);
+ explicit SemanticManager(std::string name, int streamId, const proto::PacketType& type);
~SemanticManager() = default;
private:
std::mutex mStateLock;
+ int mStreamId;
std::function<Status(const std::shared_ptr<MemHandle>&)> mDispatchCallback = nullptr;
};
} // namespace stream_manager