Add Default Runner Engine Implementation
Bug: 146898311
Test: sample client and graph implementations
Change-Id: I247d46f53fb4175de1a5c5dc208d460ec4c4939a
diff --git a/computepipe/products/computepipe.mk b/computepipe/products/computepipe.mk
index eb8efc1..d5fe7f7 100644
--- a/computepipe/products/computepipe.mk
+++ b/computepipe/products/computepipe.mk
@@ -18,6 +18,16 @@
# Enable computepipe router
PRODUCT_PACKAGES += android.automotive.computepipe.router@1.0
+# Enable computepipe runner engine library
+PRODUCT_PACKAGES += computepipe_runner_engine
+
+# Enable computepipe runner engine client interface library
+PRODUCT_PACKAGES += computepipe_client_interface
+
+# Enable computepipe runner engine prebuilt graph library
+PRODUCT_PACKAGES += computepipe_prebuilt_graph
+
+
# Selinux public policies for computepipe services
BOARD_PLAT_PUBLIC_SEPOLICY_DIR += packages/services/Car/computepipe/sepolicy/public
diff --git a/computepipe/runner/Android.bp b/computepipe/runner/Android.bp
index d17ab03..94e5928 100644
--- a/computepipe/runner/Android.bp
+++ b/computepipe/runner/Android.bp
@@ -18,10 +18,6 @@
static_libs: [
"libcomputepipeprotos",
],
- visibility: [
- "//packages/services/Car/computepipe/runner:__subpackages__",
- "//packages/services/Car/computepipe/tests:__subpackages__",
- ],
}
cc_library {
diff --git a/computepipe/runner/engine/Android.bp b/computepipe/runner/engine/Android.bp
index 89c5d29..ca58821 100644
--- a/computepipe/runner/engine/Android.bp
+++ b/computepipe/runner/engine/Android.bp
@@ -16,12 +16,45 @@
name: "computepipe_runner_engine",
srcs: [
"ConfigBuilder.cpp",
+ "DefaultEngine.cpp",
+ "Factory.cpp",
],
+ export_include_dirs: ["include"],
header_libs: [
"computepipe_runner_includes",
],
- shared_libs: [
+ static_libs: [
+ "libcomputepipeprotos",
"computepipe_runner_component",
+ "computepipe_input_manager",
+ "libcomputepipe_stream_manager",
+ ],
+ shared_libs: [
+ "computepipe_client_interface",
+ "computepipe_prebuilt_graph",
+ "libprotobuf-cpp-lite",
+ "android.hardware.automotive.evs@1.0",
+ "libbase",
+ "libcutils",
+ "libdl",
+ "libevssupport",
+ "libhardware",
+ "libhidlbase",
+ "liblog",
+ "libpng",
+ "libprotobuf-cpp-lite",
+ "libui",
+ "libutils",
+ "libEGL",
+ "libGLESv2",
+ ],
+ cflags: [
+ "-D_LIBCPP_ENABLE_THREAD_SAFETY_ANNOTATIONS",
+ "-Wall",
+ "-Werror",
+ "-Wunused",
+ "-Wunreachable-code",
+ "-Wthread-safety",
],
include_dirs: [
"packages/services/Car/computepipe",
diff --git a/computepipe/runner/engine/ConfigBuilder.cpp b/computepipe/runner/engine/ConfigBuilder.cpp
index 38ed417..99a3b6b 100644
--- a/computepipe/runner/engine/ConfigBuilder.cpp
+++ b/computepipe/runner/engine/ConfigBuilder.cpp
@@ -20,12 +20,24 @@
namespace runner {
namespace engine {
+void ConfigBuilder::setDebugDisplayStream(int id) {
+ mDisplayStream = id;
+ mOutputConfig.emplace(id, 1);
+}
+
+bool ConfigBuilder::clientConfigEnablesDisplayStream() {
+ return mConfigHasDisplayStream;
+}
+
ConfigBuilder& ConfigBuilder::updateInputConfigOption(int id) {
mInputConfigId = id;
return *this;
}
ConfigBuilder& ConfigBuilder::updateOutputStreamOption(int id, int maxInFlightPackets) {
+ if (id == mDisplayStream) {
+ mConfigHasDisplayStream = true;
+ }
mOutputConfig.emplace(id, maxInFlightPackets);
return *this;
}
@@ -54,6 +66,7 @@
mTerminationId = ClientConfig::kInvalidId;
mOffloadId = ClientConfig::kInvalidId;
mOutputConfig.clear();
+ mConfigHasDisplayStream = false;
return *this;
}
diff --git a/computepipe/runner/engine/ConfigBuilder.h b/computepipe/runner/engine/ConfigBuilder.h
index be0ac87..7937c1d 100644
--- a/computepipe/runner/engine/ConfigBuilder.h
+++ b/computepipe/runner/engine/ConfigBuilder.h
@@ -27,6 +27,14 @@
class ConfigBuilder {
public:
/**
+ * Set debug display stream in the final client config
+ */
+ void setDebugDisplayStream(int id);
+ /**
+ * Does client explicitly enable display stream
+ */
+ bool clientConfigEnablesDisplayStream();
+ /**
* Update current input option
*/
ConfigBuilder& updateInputConfigOption(int id);
@@ -56,9 +64,11 @@
ConfigBuilder& reset();
private:
+ int mDisplayStream;
int mInputConfigId;
int mOffloadId;
int mTerminationId;
+ bool mConfigHasDisplayStream = false;
std::map<int, int> mOutputConfig;
std::string mOptionalConfig;
};
diff --git a/computepipe/runner/engine/DefaultEngine.cpp b/computepipe/runner/engine/DefaultEngine.cpp
new file mode 100644
index 0000000..92eabf2
--- /dev/null
+++ b/computepipe/runner/engine/DefaultEngine.cpp
@@ -0,0 +1,621 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "DefaultEngine.h"
+
+#include <android-base/logging.h>
+
+#include <algorithm>
+#include <cassert>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "ClientInterface.h"
+#include "EventGenerator.h"
+#include "InputFrame.h"
+#include "PrebuiltGraph.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+using android::automotive::computepipe::graph::PrebuiltGraph;
+using android::automotive::computepipe::runner::client_interface::ClientInterface;
+using android::automotive::computepipe::runner::generator::DefaultEvent;
+using android::automotive::computepipe::runner::input_manager::InputEngineInterface;
+using android::automotive::computepipe::runner::stream_manager::StreamEngineInterface;
+using android::automotive::computepipe::runner::stream_manager::StreamManager;
+
+namespace {
+
+int getStreamIdFromSource(std::string source) {
+ auto pos = source.find(":");
+ return std::stoi(source.substr(pos + 1));
+}
+} // namespace
+
+void DefaultEngine::setClientInterface(std::unique_ptr<ClientInterface>&& client) {
+ mClient = std::move(client);
+}
+
+void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
+ mGraph = std::move(graph);
+ mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
+}
+
+Status DefaultEngine::setArgs(std::string engine_args) {
+ auto pos = engine_args.find(kNoInputManager);
+ if (pos != std::string::npos) {
+ mIgnoreInputManager = true;
+ }
+ pos = engine_args.find(kDisplayStreamId);
+ if (pos == std::string::npos) {
+ return Status::SUCCESS;
+ }
+ mDisplayStream = std::stoi(engine_args.substr(pos + strlen(kDisplayStreamId)));
+ mConfigBuilder.setDebugDisplayStream(mDisplayStream);
+ return Status::SUCCESS;
+}
+
+Status DefaultEngine::activate() {
+ mConfigBuilder.reset();
+ mEngineThread = std::make_unique<std::thread>(&DefaultEngine::processCommands, this);
+ return mClient->activate();
+}
+
+Status DefaultEngine::processClientConfigUpdate(const proto::ConfigurationCommand& command) {
+ // TODO check current phase
+ std::lock_guard<std::mutex> lock(mEngineLock);
+ if (mCurrentPhase != kResetPhase) {
+ return Status::ILLEGAL_STATE;
+ }
+ if (command.has_set_input_source()) {
+ mConfigBuilder =
+ mConfigBuilder.updateInputConfigOption(command.set_input_source().source_id());
+ } else if (command.has_set_termination_option()) {
+ mConfigBuilder = mConfigBuilder.updateTerminationOption(
+ command.set_termination_option().termination_option_id());
+ } else if (command.has_set_output_stream()) {
+ mConfigBuilder = mConfigBuilder.updateOutputStreamOption(
+ command.set_output_stream().stream_id(),
+ command.set_output_stream().max_inflight_packets_count());
+ } else if (command.has_set_offload_offload()) {
+ mConfigBuilder =
+ mConfigBuilder.updateOffloadOption(command.set_offload_offload().offload_option_id());
+ } else {
+ return SUCCESS;
+ }
+ return Status::SUCCESS;
+}
+
+Status DefaultEngine::processClientCommand(const proto::ControlCommand& command) {
+ // TODO check current phase
+ std::lock_guard<std::mutex> lock(mEngineLock);
+
+ if (command.has_apply_configs()) {
+ if (mCurrentPhase != kResetPhase) {
+ return Status::ILLEGAL_STATE;
+ }
+ queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_CONFIG);
+ return Status::SUCCESS;
+ }
+ if (command.has_start_graph()) {
+ if (mCurrentPhase != kConfigPhase) {
+ return Status::ILLEGAL_STATE;
+ }
+ queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_START_RUN);
+ return Status::SUCCESS;
+ }
+ if (command.has_stop_graph()) {
+ if (mCurrentPhase != kRunPhase) {
+ return Status::ILLEGAL_STATE;
+ }
+ mStopFromClient = true;
+ queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_INITIATE_STOP);
+ return Status::SUCCESS;
+ }
+ if (command.has_death_notification()) {
+ mErrorQueue.push(ComponentError("ClientInterface", "Client death", mCurrentPhase, false));
+ mWakeLooper.notify_all();
+ return Status::SUCCESS;
+ }
+ return Status::SUCCESS;
+}
+
+Status DefaultEngine::freePacket(const std::shared_ptr<MemHandle>& packet) {
+ int streamId = packet->getStreamId();
+ mStreamManagers[streamId]->freePacket(packet);
+ return Status::SUCCESS;
+}
+
+/**
+ * Methods from PrebuiltEngineInterface
+ */
+void DefaultEngine::DispatchPixelData(int /* streamId */, int64_t /* timestamp */,
+ const uint8_t* /* pixels */, int /* width */,
+ int /* height */, int /* step */, PixelFormat /* format*/) {
+ // TODO: b/147975150 Add pixel stream forwarding to stream manager.
+ return;
+}
+
+void DefaultEngine::DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) {
+ LOG(INFO) << "Engine::Received data for stream " << streamId << " with timestamp " << timestamp;
+ if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
+ LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
+ }
+ std::string data(output);
+ mStreamManagers[streamId]->queuePacket(data.c_str(), data.size(), timestamp);
+}
+
+void DefaultEngine::DispatchGraphTerminationMessage(Status s, std::string&& msg) {
+ std::lock_guard<std::mutex> lock(mEngineLock);
+ if (s == SUCCESS) {
+ if (mCurrentPhase == kRunPhase) {
+ queueCommand("PrebuiltGraph", EngineCommand::Type::BROADCAST_INITIATE_STOP);
+ } else {
+ LOG(WARNING) << "Graph termination when not in run phase";
+ }
+ } else {
+ std::string error = msg;
+ queueError("PrebuiltGraph", error, false);
+ }
+}
+
+Status DefaultEngine::broadcastClientConfig() {
+ ClientConfig config = mConfigBuilder.emitClientOptions();
+
+ LOG(INFO) << "Engine::create stream manager";
+ Status ret = populateStreamManagers(config);
+ if (ret != Status::SUCCESS) {
+ return ret;
+ }
+
+ if (mGraph) {
+ ret = populateInputManagers(config);
+ if (ret != Status::SUCCESS) {
+ abortClientConfig(config);
+ return ret;
+ }
+
+ LOG(INFO) << "Engine::send client config entry to graph";
+ config.setPhaseState(PhaseState::ENTRY);
+ ret = mGraph->handleConfigPhase(config);
+ if (ret != Status::SUCCESS) {
+ abortClientConfig(config);
+ return ret;
+ }
+ LOG(INFO) << "Engine::send client config transition complete to graph";
+ config.setPhaseState(PhaseState::TRANSITION_COMPLETE);
+ ret = mGraph->handleConfigPhase(config);
+ if (ret != Status::SUCCESS) {
+ abortClientConfig(config);
+ return ret;
+ }
+ }
+ LOG(INFO) << "Engine::Graph configured";
+ // TODO add handling for remote graph
+ ret = mClient->handleConfigPhase(config);
+ if (ret != Status::SUCCESS) {
+ config.setPhaseState(PhaseState::ABORTED);
+ abortClientConfig(config, true);
+ return ret;
+ }
+ mCurrentPhase = kConfigPhase;
+ return Status::SUCCESS;
+}
+
+void DefaultEngine::abortClientConfig(const ClientConfig& config, bool resetGraph) {
+ mStreamManagers.clear();
+ mInputManagers.clear();
+ if (resetGraph && mGraph) {
+ (void)mGraph->handleConfigPhase(config);
+ }
+ (void)mClient->handleConfigPhase(config);
+ // TODO add handling for remote graph
+}
+
+Status DefaultEngine::broadcastStartRun() {
+ DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RUN);
+
+ std::vector<int> successfulStreams;
+ std::vector<int> successfulInputs;
+ for (auto& it : mStreamManagers) {
+ LOG(INFO) << "Engine::sending start run to stream manager " << it.first << " failed";
+ if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
+ LOG(ERROR) << "Engine::failure to enter run phase for stream " << it.first;
+ broadcastAbortRun(successfulStreams, successfulInputs);
+ return Status::INTERNAL_ERROR;
+ }
+ successfulStreams.push_back(it.first);
+ }
+ // TODO: send to remote
+ Status ret;
+ if (mGraph) {
+ LOG(INFO) << "Engine::sending start run to prebuilt";
+ ret = mGraph->handleExecutionPhase(runEvent);
+ if (ret != Status::SUCCESS) {
+ broadcastAbortRun(successfulStreams, successfulInputs);
+ }
+ for (auto& it : mInputManagers) {
+ if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
+ LOG(ERROR) << "Engine::failure to enter run phase for input manager " << it.first;
+ broadcastAbortRun(successfulStreams, successfulInputs, true);
+ return Status::INTERNAL_ERROR;
+ }
+ successfulInputs.push_back(it.first);
+ }
+ }
+ runEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RUN);
+ LOG(INFO) << "Engine::sending run transition complete to client";
+ ret = mClient->handleExecutionPhase(runEvent);
+ if (ret != Status::SUCCESS) {
+ LOG(ERROR) << "Engine::client failure to acknowledge transition to run complete ";
+ broadcastAbortRun(successfulStreams, successfulInputs, true);
+ return ret;
+ }
+ for (auto& it : mStreamManagers) {
+ (void)it.second->handleExecutionPhase(runEvent);
+ }
+ // TODO: send to remote
+ if (mGraph) {
+ LOG(INFO) << "Engine::sending run transition complete to prebuilt";
+ (void)mGraph->handleExecutionPhase(runEvent);
+ for (auto& it : mInputManagers) {
+ (void)it.second->handleExecutionPhase(runEvent);
+ }
+ }
+ LOG(INFO) << "Engine::Running";
+ mCurrentPhase = kRunPhase;
+ return Status::SUCCESS;
+}
+
+void DefaultEngine::broadcastAbortRun(const std::vector<int>& streamIds,
+ const std::vector<int>& inputIds, bool abortGraph) {
+ DefaultEvent runEvent = DefaultEvent::generateAbortEvent(DefaultEvent::RUN);
+ std::for_each(streamIds.begin(), streamIds.end(), [this, runEvent](int id) {
+ (void)this->mStreamManagers[id]->handleExecutionPhase(runEvent);
+ });
+ std::for_each(inputIds.begin(), inputIds.end(), [this, runEvent](int id) {
+ (void)this->mInputManagers[id]->handleExecutionPhase(runEvent);
+ });
+ if (abortGraph) {
+ if (mGraph) {
+ (void)mGraph->handleExecutionPhase(runEvent);
+ }
+ }
+}
+
+Status DefaultEngine::broadcastStopWithFlush() {
+ DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_WITH_FLUSH);
+
+ if (mGraph) {
+ for (auto& it : mInputManagers) {
+ (void)it.second->handleStopWithFlushPhase(runEvent);
+ }
+ if (mStopFromClient) {
+ (void)mGraph->handleStopWithFlushPhase(runEvent);
+ }
+ }
+ // TODO: send to remote.
+ for (auto& it : mStreamManagers) {
+ (void)it.second->handleStopWithFlushPhase(runEvent);
+ }
+ if (!mStopFromClient) {
+ (void)mClient->handleStopWithFlushPhase(runEvent);
+ }
+ mCurrentPhase = kStopPhase;
+ return Status::SUCCESS;
+}
+
+Status DefaultEngine::broadcastStopComplete() {
+ DefaultEvent runEvent =
+ DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_WITH_FLUSH);
+ if (mGraph) {
+ for (auto& it : mInputManagers) {
+ (void)it.second->handleStopWithFlushPhase(runEvent);
+ }
+ (void)mGraph->handleStopWithFlushPhase(runEvent);
+ }
+ // TODO: send to remote.
+ for (auto& it : mStreamManagers) {
+ (void)it.second->handleStopWithFlushPhase(runEvent);
+ }
+ (void)mClient->handleStopWithFlushPhase(runEvent);
+ mCurrentPhase = kConfigPhase;
+ return Status::SUCCESS;
+}
+
+void DefaultEngine::broadcastHalt() {
+ DefaultEvent stopEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_IMMEDIATE);
+
+ if (mGraph) {
+ for (auto& it : mInputManagers) {
+ (void)it.second->handleStopImmediatePhase(stopEvent);
+ }
+
+ if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos)) {
+ (void)mGraph->handleStopImmediatePhase(stopEvent);
+ }
+ }
+ // TODO: send to remote if client was source.
+ for (auto& it : mStreamManagers) {
+ (void)it.second->handleStopImmediatePhase(stopEvent);
+ }
+ if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
+ (void)mClient->handleStopImmediatePhase(stopEvent);
+ }
+
+ stopEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_IMMEDIATE);
+ if (mGraph) {
+ for (auto& it : mInputManagers) {
+ (void)it.second->handleStopImmediatePhase(stopEvent);
+ }
+ // TODO: send to graph or remote if client was source.
+
+ if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos) && mGraph) {
+ (void)mGraph->handleStopImmediatePhase(stopEvent);
+ }
+ }
+ for (auto& it : mStreamManagers) {
+ (void)it.second->handleStopImmediatePhase(stopEvent);
+ }
+ if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
+ (void)mClient->handleStopImmediatePhase(stopEvent);
+ }
+ mCurrentPhase = kConfigPhase;
+}
+
+void DefaultEngine::broadcastReset() {
+ mStreamManagers.clear();
+ mInputManagers.clear();
+ DefaultEvent resetEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RESET);
+ (void)mClient->handleResetPhase(resetEvent);
+ if (mGraph) {
+ (void)mGraph->handleResetPhase(resetEvent);
+ }
+ resetEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RESET);
+ (void)mClient->handleResetPhase(resetEvent);
+ if (mGraph) {
+ (void)mGraph->handleResetPhase(resetEvent);
+ }
+ // TODO: send to remote runner
+ mConfigBuilder.reset();
+ mCurrentPhase = kResetPhase;
+ mStopFromClient = false;
+}
+
+Status DefaultEngine::populateStreamManagers(const ClientConfig& config) {
+ std::map<int, int> outputConfigs;
+ if (config.getOutputStreamConfigs(outputConfigs) != Status::SUCCESS) {
+ return Status::ILLEGAL_STATE;
+ }
+ for (auto& configIt : outputConfigs) {
+ int streamId = configIt.first;
+ int maxInFlightPackets = configIt.second;
+ proto::OutputConfig outputDescriptor;
+ // find the output descriptor for requested stream id
+ bool foundDesc = false;
+ for (auto& optionIt : mGraphDescriptor.output_configs()) {
+ if (optionIt.stream_id() == streamId) {
+ outputDescriptor = optionIt;
+ foundDesc = true;
+ break;
+ }
+ }
+ if (!foundDesc) {
+ LOG(ERROR) << "no matching output config for requested id " << streamId;
+ return Status::INVALID_ARGUMENT;
+ }
+ std::function<Status(std::shared_ptr<MemHandle>)> packetCb =
+ [this, streamId](std::shared_ptr<MemHandle> handle) -> Status {
+ return this->forwardOutputDataToClient(streamId, handle);
+ };
+
+ std::function<void(std::string)> errorCb = [this, streamId](std::string m) {
+ std::string source = "StreamManager:" + std::to_string(streamId) + " : " + m;
+ this->queueError(source, m, false);
+ };
+
+ std::function<void()> eos = [this, streamId]() {
+ std::string source = "StreamManager:" + std::to_string(streamId);
+ std::lock_guard<std::mutex> lock(this->mEngineLock);
+ this->queueCommand(source, EngineCommand::Type::POLL_COMPLETE);
+ };
+
+ std::shared_ptr<StreamEngineInterface> engine = std::make_shared<StreamCallback>(
+ std::move(eos), std::move(errorCb), std::move(packetCb));
+ mStreamManagers.emplace(configIt.first, mStreamFactory.getStreamManager(
+ outputDescriptor, engine, maxInFlightPackets));
+ if (mStreamManagers[streamId] == nullptr) {
+ LOG(ERROR) << "unable to create stream manager for stream " << streamId;
+ return Status::INTERNAL_ERROR;
+ }
+ }
+ return Status::SUCCESS;
+}
+
+Status DefaultEngine::forwardOutputDataToClient(int streamId,
+ std::shared_ptr<MemHandle>& dataHandle) {
+ if (streamId == mDisplayStream) {
+ // TODO: dispatch to display
+ if (mConfigBuilder.clientConfigEnablesDisplayStream()) {
+ return mClient->dispatchPacketToClient(streamId, dataHandle);
+ }
+ }
+ return mClient->dispatchPacketToClient(streamId, dataHandle);
+}
+
+Status DefaultEngine::populateInputManagers(const ClientConfig& config) {
+ if (mIgnoreInputManager) {
+ return Status::SUCCESS;
+ }
+ proto::InputConfig inputDescriptor;
+ int selectedId;
+
+ if (config.getInputConfigId(&selectedId) != Status::SUCCESS) {
+ return Status::INVALID_ARGUMENT;
+ }
+
+ for (auto& inputIt : mGraphDescriptor.input_configs()) {
+ if (selectedId == inputIt.config_id()) {
+ inputDescriptor = inputIt;
+ std::shared_ptr<InputCallback> cb = std::make_shared<InputCallback>(
+ selectedId,
+ [this](int id) {
+ std::string source = "InputManager:" + std::to_string(id);
+ this->queueError(source, "", false);
+ },
+ [](int /*streamId */, const InputFrame& /* frame */) { return Status::SUCCESS; });
+ mInputManagers.emplace(selectedId,
+ mInputFactory.createInputManager(inputDescriptor, cb));
+ if (mInputManagers[selectedId] == nullptr) {
+ LOG(ERROR) << "unable to create input manager for stream " << selectedId;
+ // TODO: Add print
+ return Status::INTERNAL_ERROR;
+ }
+ return Status::SUCCESS;
+ }
+ }
+ return Status::INVALID_ARGUMENT;
+}
+
+/**
+ * Engine Command Queue and Error Queue handling
+ */
+void DefaultEngine::processCommands() {
+ std::unique_lock<std::mutex> lock(mEngineLock);
+ while (1) {
+ LOG(INFO) << "Engine::Waiting on commands ";
+ mWakeLooper.wait(lock, [this] {
+ if (this->mCommandQueue.empty() && !mCurrentPhaseError) {
+ return false;
+ } else {
+ return true;
+ }
+ });
+ if (mCurrentPhaseError) {
+ mErrorQueue.push(*mCurrentPhaseError);
+
+ processComponentError(mCurrentPhaseError->source);
+ mCurrentPhaseError = nullptr;
+ std::queue<EngineCommand> empty;
+ std::swap(mCommandQueue, empty);
+ continue;
+ }
+ EngineCommand ec = mCommandQueue.front();
+ mCommandQueue.pop();
+ switch (ec.cmdType) {
+ case EngineCommand::Type::BROADCAST_CONFIG:
+ LOG(INFO) << "Engine::Received broacast config request";
+ (void)broadcastClientConfig();
+ break;
+ case EngineCommand::Type::BROADCAST_START_RUN:
+ LOG(INFO) << "Engine::Received broacast run request";
+ (void)broadcastStartRun();
+ break;
+ case EngineCommand::Type::BROADCAST_INITIATE_STOP:
+ if (ec.source.find("ClientInterface") != std::string::npos) {
+ mStopFromClient = true;
+ }
+ LOG(INFO) << "Engine::Received broacast stop with flush request";
+ broadcastStopWithFlush();
+ break;
+ case EngineCommand::Type::POLL_COMPLETE:
+ LOG(INFO) << "Engine::Received Poll stream managers for completion request";
+ int id = getStreamIdFromSource(ec.source);
+ bool all_done = true;
+ for (auto& it : mStreamManagers) {
+ if (it.first == id) {
+ continue;
+ }
+ if (it.second->getState() != StreamManager::State::STOPPED) {
+ all_done = false;
+ }
+ }
+ if (all_done) {
+ broadcastStopComplete();
+ }
+ break;
+ }
+ }
+}
+
+void DefaultEngine::processComponentError(std::string source) {
+ if (mCurrentPhase == kRunPhase || mCurrentPhase == kStopPhase) {
+ (void)broadcastHalt();
+ }
+ if (source.find("ClientInterface") != std::string::npos) {
+ (void)broadcastReset();
+ }
+}
+
+void DefaultEngine::queueCommand(std::string source, EngineCommand::Type type) {
+ mCommandQueue.push(EngineCommand(source, type));
+ mWakeLooper.notify_all();
+}
+
+void DefaultEngine::queueError(std::string source, std::string msg, bool fatal) {
+ std::lock_guard<std::mutex> lock(mEngineLock);
+ // current phase already has an error report
+ if (!mCurrentPhaseError) {
+ mCurrentPhaseError = std::make_unique<ComponentError>(source, msg, mCurrentPhase, fatal);
+ mWakeLooper.notify_all();
+ }
+}
+
+/**
+ * InputCallback implementation
+ */
+InputCallback::InputCallback(int id, const std::function<void(int)>&& cb,
+ const std::function<Status(int, const InputFrame&)>&& packetCb)
+ : mErrorCallback(cb), mPacketHandler(packetCb), mInputId(id) {
+}
+
+Status InputCallback::dispatchInputFrame(int streamId, const InputFrame& frame) {
+ return mPacketHandler(streamId, frame);
+}
+void InputCallback::notifyInputError() {
+ mErrorCallback(mInputId);
+}
+
+/**
+ * StreamCallback implementation
+ */
+StreamCallback::StreamCallback(
+ const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
+ const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler)
+ : mErrorHandler(errorCb), mEndOfStreamHandler(eos), mPacketHandler(packetHandler) {
+}
+
+void StreamCallback::notifyError(std::string msg) {
+ mErrorHandler(msg);
+}
+
+void StreamCallback::notifyEndOfStream() {
+ mEndOfStreamHandler();
+}
+
+Status StreamCallback::dispatchPacket(const std::shared_ptr<MemHandle>& packet) {
+ return mPacketHandler(packet);
+}
+
+} // namespace engine
+} // namespace runner
+} // namespace computepipe
+} // namespace automotive
+} // namespace android
diff --git a/computepipe/runner/engine/DefaultEngine.h b/computepipe/runner/engine/DefaultEngine.h
new file mode 100644
index 0000000..5576914
--- /dev/null
+++ b/computepipe/runner/engine/DefaultEngine.h
@@ -0,0 +1,327 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_ENGINE_DEFAULT_H
+#define COMPUTEPIPE_RUNNER_ENGINE_DEFAULT_H
+
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include "ConfigBuilder.h"
+#include "InputManager.h"
+#include "Options.pb.h"
+#include "RunnerEngine.h"
+#include "StreamManager.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+class InputCallback;
+
+/**
+ * EngineCommand represents client requests or error events.
+ * Each command is queued, and processed by the engine thread.
+ */
+struct EngineCommand {
+ public:
+ enum Type {
+ BROADCAST_CONFIG = 0,
+ BROADCAST_START_RUN,
+ BROADCAST_INITIATE_STOP,
+ POLL_COMPLETE,
+ };
+ std::string source;
+ Type cmdType;
+ explicit EngineCommand(std::string s, Type t) : source(s), cmdType(t) {
+ }
+};
+
+/**
+ * Component Error represents the type of error reported by a component.
+ */
+struct ComponentError {
+ public:
+ bool isFatal;
+ std::string source;
+ std::string message;
+ std::string currentPhase;
+ explicit ComponentError(std::string s, std::string m, std::string p, bool fatal = false)
+ : isFatal(fatal), source(s), message(m), currentPhase(p) {
+ }
+};
+
+/**
+ * Default Engine implementation.
+ * Takes ownership of externally instantiated graph & client interface
+ * instances. Brings the runner online. Manages components.
+ * Responds to client events.
+ */
+class DefaultEngine : public RunnerEngine {
+ public:
+ static constexpr char kDisplayStreamId[] = "display_stream:";
+ static constexpr char kNoInputManager[] = "no_input_manager";
+ static constexpr char kResetPhase[] = "Reset";
+ static constexpr char kConfigPhase[] = "Config";
+ static constexpr char kRunPhase[] = "Running";
+ static constexpr char kStopPhase[] = "Stopping";
+ /**
+ * Methods from Runner Engine to override
+ */
+ Status setArgs(std::string engine_args) override;
+ void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) override;
+ void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) override;
+ Status activate() override;
+ /**
+ * Methods from ClientEngineInterface to override
+ */
+ Status processClientConfigUpdate(const proto::ConfigurationCommand& command) override;
+ Status processClientCommand(const proto::ControlCommand& command) override;
+ Status freePacket(const std::shared_ptr<MemHandle>& packet) override;
+ /**
+ * Methods from PrebuiltEngineInterface to override
+ */
+ void DispatchPixelData(int streamId, int64_t timestamp, const uint8_t* pixels, int width,
+ int height, int step, PixelFormat format) override;
+
+ void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) override;
+
+ void DispatchGraphTerminationMessage(Status s, std::string&& msg) override;
+
+ private:
+ // TODO: b/147704051 Add thread analyzer annotations
+ /**
+ * BroadCast Client config to all components. If all components handle the
+ * notification correctly, then broadcast transition complete.
+ * Successful return from this function implies runner has transitioned to
+ * configuration done.
+ * @Lock held mEngineLock
+ */
+ Status broadcastClientConfig();
+ /**
+ * Abort an ongoing attempt to apply client configs.
+ * @Lock held mEngineLock
+ */
+ void abortClientConfig(const ClientConfig& config, bool resetGraph = false);
+ /**
+ * BroadCast start to all components. The order of entry into run phase
+ * notification delivery is downstream components to upstream components.
+ * If all components handle the entry notification correctly then broadcast
+ * transition complete notification again from down stream to upstream.
+ * Successful return from this function implies runner has transitioned to
+ * running.
+ * @Lock held mEngineLock
+ */
+ Status broadcastStartRun();
+ /**
+ * BroadCast abort of started run to given components. This gets called if during
+ * broadcastStartRun(), one of the components failed to set itself up for the run. In that case
+ * the components that had successfully acknowledged broacastStartRun(),
+ * need to be told to abort. We transition back to config phase at the end
+ * of this call.
+ * @Lock held mEngineLock
+ */
+ void broadcastAbortRun(const std::vector<int>& streamIds, const std::vector<int>& inputIds,
+ bool graph = false);
+
+ /**
+ * Broadcast stop with flush to all components. The stop with flush phase
+ * entry notification is sent from in the order of upstream to downstream.
+ * A successful return can leave the runner in stopping phase.
+ * We transition to stop completely, once all inflight traffic has been drained at a later
+ * point, identified by stream managers.
+ * @Lock held mEngineLock
+ */
+ Status broadcastStopWithFlush();
+ /**
+ * Broadcast transtion to stop complete. This is a confirmation to all
+ * components that stop has finished. At the end of this we transition back
+ * to config phase.
+ * @Lock held mEngineLock
+ */
+ Status broadcastStopComplete();
+ /**
+ * Broadcast halt to all components. All inflight traffic is dropped.
+ * Successful return from this function implies all components have
+ * exited run phase and are back in config phase.
+ * @Lock held mEngineLock
+ */
+ void broadcastHalt();
+ /**
+ * Broadcast reset to all components. All components drop client
+ * specific configuration and transition to reset state. For RAII
+ * components, they are freed at this point. ALso resets the mConfigBuilder
+ * to its original state. Successful return puts the runner in reset phase.
+ * @Lock held mEngineLock
+ */
+ void broadcastReset();
+ /**
+ * Populate stream managers for a given client config. For each client
+ * selected output config, we generate stream managers. During reset phase
+ * we clear out any previously constructed stream managers. This should be
+ * invoked only in response to applyConfigs() issued by client.
+ * @Lock held mEngineLock
+ */
+ Status populateStreamManagers(const ClientConfig& config);
+ /**
+ * Populate input managers for a given client config. For each client
+ * selected output config, we generate input managers. During reset phase
+ * we clear out any previously constructed input managers. This should be
+ * invoked only in response to applyConfigs() issued by client.
+ * @Lock held mEngineLock
+ */
+ Status populateInputManagers(const ClientConfig& config);
+ /**
+ * Helper method to forward packet to client interface for transmission
+ */
+ Status forwardOutputDataToClient(int streamId, std::shared_ptr<MemHandle>& handle);
+ /**
+ * Helper to handle error notification from components, in the errorQueue.
+ * In case the source of the error is client interface, it will
+ * broadcastReset().
+ * This called in the mEngineThread when processing an entry from the
+ * errorQueue,
+ * @Lock acquires mEngineLock.
+ */
+ void processComponentError(std::string source);
+ /**
+ * Method run by the engine thread to process commands.
+ * Uses condition variable. Acquires lock mEngineLock to access
+ * command queues.
+ */
+ void processCommands();
+ /**
+ * Method run by external components to queue commands to the engine.
+ * Must be called with mEngineLock held. Wakes up the looper.
+ */
+ void queueCommand(std::string source, EngineCommand::Type type);
+ /**
+ * Method called by component reporting error.
+ * This will acquire mEngineLock and queue the error.
+ */
+ void queueError(std::string source, std::string msg, bool fatal);
+ /**
+ * client interface handle
+ */
+ std::unique_ptr<client_interface::ClientInterface> mClient = nullptr;
+ /**
+ * builder to build up client config incrementally.
+ */
+ ConfigBuilder mConfigBuilder;
+ /**
+ * Stream management members
+ */
+ std::map<int, std::unique_ptr<stream_manager::StreamManager>> mStreamManagers;
+ stream_manager::StreamManagerFactory mStreamFactory;
+ /**
+ * Input manager members
+ */
+ std::map<int, std::unique_ptr<input_manager::InputManager>> mInputManagers;
+ input_manager::InputManagerFactory mInputFactory;
+ /**
+ * stream to dump to display for debug purposes
+ */
+ int32_t mDisplayStream = ClientConfig::kInvalidId;
+ /**
+ * graph descriptor
+ */
+ proto::Options mGraphDescriptor;
+ std::unique_ptr<graph::PrebuiltGraph> mGraph;
+ /**
+ * stop signal source
+ */
+ bool mStopFromClient = true;
+ /**
+ * Phase management members
+ */
+ std::string mCurrentPhase = kResetPhase;
+ std::mutex mEngineLock;
+ /**
+ * Used to track the first error occurrence for a given phase.
+ */
+ std::unique_ptr<ComponentError> mCurrentPhaseError = nullptr;
+
+ /**
+ * Queue for client commands
+ */
+ std::queue<EngineCommand> mCommandQueue;
+ /**
+ * Queue for error notifications
+ */
+ std::queue<ComponentError> mErrorQueue;
+ /**
+ * Engine looper
+ */
+ std::unique_ptr<std::thread> mEngineThread;
+ /**
+ * Condition variable for looper
+ */
+ std::condition_variable mWakeLooper;
+ /**
+ * ignore input manager allocation
+ */
+ bool mIgnoreInputManager = false;
+};
+
+/**
+ * Handles callbacks from individual stream managers as specified in the
+ * StreamEngineInterface.
+ */
+class StreamCallback : public stream_manager::StreamEngineInterface {
+ public:
+ explicit StreamCallback(
+ const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
+ const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler);
+ void notifyEndOfStream() override;
+ void notifyError(std::string msg) override;
+ Status dispatchPacket(const std::shared_ptr<MemHandle>& outData) override;
+ ~StreamCallback() = default;
+
+ private:
+ std::function<void(std::string)> mErrorHandler;
+ std::function<void()> mEndOfStreamHandler;
+ std::function<Status(const std::shared_ptr<MemHandle>&)> mPacketHandler;
+};
+
+/**
+ * Handles callbacks from input managers. Forwards frames to the graph.
+ * Only used if graph implementation is local
+ */
+class InputCallback : public input_manager::InputEngineInterface {
+ public:
+ explicit InputCallback(int id, const std::function<void(int)>&& cb,
+ const std::function<Status(int, const InputFrame&)>&& packetCb);
+ Status dispatchInputFrame(int streamId, const InputFrame& frame) override;
+ void notifyInputError() override;
+ ~InputCallback() = default;
+
+ private:
+ std::function<void(int)> mErrorCallback;
+ std::function<Status(int, const InputFrame&)> mPacketHandler;
+ int mInputId;
+};
+
+} // namespace engine
+} // namespace runner
+} // namespace computepipe
+} // namespace automotive
+} // namespace android
+
+#endif
diff --git a/computepipe/runner/engine/Factory.cpp b/computepipe/runner/engine/Factory.cpp
new file mode 100644
index 0000000..67b3cd3
--- /dev/null
+++ b/computepipe/runner/engine/Factory.cpp
@@ -0,0 +1,46 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "DefaultEngine.h"
+#include "RunnerEngine.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+namespace {
+std::unique_ptr<DefaultEngine> createDefaultEngine(std::string engine_args) {
+ std::unique_ptr<DefaultEngine> engine = std::make_unique<DefaultEngine>();
+ if (engine->setArgs(engine_args) != Status::SUCCESS) {
+ return nullptr;
+ }
+ return engine;
+}
+} // namespace
+
+std::unique_ptr<RunnerEngine> RunnerEngineFactory::createRunnerEngine(std::string engine,
+ std::string engine_args) {
+ if (engine == kDefault) {
+ return createDefaultEngine(engine_args);
+ }
+ return nullptr;
+}
+
+} // namespace engine
+} // namespace runner
+} // namespace computepipe
+} // namespace automotive
+} // namespace android
diff --git a/computepipe/runner/engine/include/RunnerEngine.h b/computepipe/runner/engine/include/RunnerEngine.h
new file mode 100644
index 0000000..778099d
--- /dev/null
+++ b/computepipe/runner/engine/include/RunnerEngine.h
@@ -0,0 +1,71 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_ENGINE_H
+#define COMPUTEPIPE_RUNNER_ENGINE_H
+
+#include <memory>
+#include <string>
+
+#include "ClientInterface.h"
+#include "PrebuiltGraph.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+/**
+ * Class that offers an interface into an engine. It derives from the client,
+ * prebuilt -> engine interfaces to enforce that any instantiation of an engine
+ * needs to provide an implementation for those interfaces.
+ */
+class RunnerEngine : public client_interface::ClientEngineInterface,
+ public graph::PrebuiltEngineInterface {
+ public:
+ /**
+ * Any args that a given engine instance needs in order to configure itself.
+ */
+ virtual Status setArgs(std::string engine_args) = 0;
+ /**
+ * Set the client and the prebuilt graph instances
+ */
+ virtual void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) = 0;
+
+ virtual void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) = 0;
+ /**
+ * Activates the client interface and advertises to the rest of the world
+ * that the runner is online
+ */
+ virtual Status activate() = 0;
+};
+
+class RunnerEngineFactory {
+ public:
+ static constexpr char kDefault[] = "default_engine";
+ std::unique_ptr<RunnerEngine> createRunnerEngine(std::string engine, std::string engine_args);
+ RunnerEngineFactory(const RunnerEngineFactory&) = delete;
+ RunnerEngineFactory& operator=(const RunnerEngineFactory&) = delete;
+ RunnerEngineFactory() = default;
+};
+
+} // namespace engine
+} // namespace runner
+} // namespace computepipe
+} // namespace automotive
+} // namespace android
+
+#endif
diff --git a/computepipe/runner/graph/include/PrebuiltGraph.h b/computepipe/runner/graph/include/PrebuiltGraph.h
index f96a8ad..66c069b 100644
--- a/computepipe/runner/graph/include/PrebuiltGraph.h
+++ b/computepipe/runner/graph/include/PrebuiltGraph.h
@@ -51,8 +51,6 @@
// No copy or move constructors or operators are available.
PrebuiltGraph(const PrebuiltGraph&) = delete;
PrebuiltGraph& operator=(const PrebuiltGraph&) = delete;
- PrebuiltGraph(PrebuiltGraph&&) = delete;
- PrebuiltGraph& operator=(PrebuiltGraph&&) = delete;
// Override RunnerComponent interface functions for applying configs,
// starting the graph and stopping the graph.
diff --git a/computepipe/runner/stream_manager/SemanticManager.cpp b/computepipe/runner/stream_manager/SemanticManager.cpp
index fc73c74..abb4093 100644
--- a/computepipe/runner/stream_manager/SemanticManager.cpp
+++ b/computepipe/runner/stream_manager/SemanticManager.cpp
@@ -83,7 +83,7 @@
std::lock_guard<std::mutex> lock(mStateLock);
if (mState == CONFIG_DONE && e.isPhaseEntry()) {
mState = RUNNING;
- return ILLEGAL_STATE;
+ return SUCCESS;
}
if (mState == RESET) {
/* Cannot get to running phase from reset state without config phase*/