Add Default Runner Engine Implementation

Bug: 146898311
Test: sample client and graph implementations
Change-Id: I247d46f53fb4175de1a5c5dc208d460ec4c4939a
diff --git a/computepipe/products/computepipe.mk b/computepipe/products/computepipe.mk
index eb8efc1..d5fe7f7 100644
--- a/computepipe/products/computepipe.mk
+++ b/computepipe/products/computepipe.mk
@@ -18,6 +18,16 @@
 # Enable computepipe router
 PRODUCT_PACKAGES += android.automotive.computepipe.router@1.0
 
+# Enable computepipe runner engine library
+PRODUCT_PACKAGES += computepipe_runner_engine
+
+# Enable computepipe runner engine client interface library
+PRODUCT_PACKAGES += computepipe_client_interface
+
+# Enable computepipe runner engine prebuilt graph library
+PRODUCT_PACKAGES += computepipe_prebuilt_graph
+
+
 # Selinux public policies for computepipe services
 BOARD_PLAT_PUBLIC_SEPOLICY_DIR += packages/services/Car/computepipe/sepolicy/public
 
diff --git a/computepipe/runner/Android.bp b/computepipe/runner/Android.bp
index d17ab03..94e5928 100644
--- a/computepipe/runner/Android.bp
+++ b/computepipe/runner/Android.bp
@@ -18,10 +18,6 @@
     static_libs: [
         "libcomputepipeprotos",
     ],
-    visibility: [
-        "//packages/services/Car/computepipe/runner:__subpackages__",
-        "//packages/services/Car/computepipe/tests:__subpackages__",
-    ],
 }
 
 cc_library {
diff --git a/computepipe/runner/engine/Android.bp b/computepipe/runner/engine/Android.bp
index 89c5d29..ca58821 100644
--- a/computepipe/runner/engine/Android.bp
+++ b/computepipe/runner/engine/Android.bp
@@ -16,12 +16,45 @@
     name: "computepipe_runner_engine",
     srcs: [
         "ConfigBuilder.cpp",
+	"DefaultEngine.cpp",
+	"Factory.cpp",
     ],
+    export_include_dirs: ["include"],
     header_libs: [
         "computepipe_runner_includes",
     ],
-    shared_libs: [
+    static_libs: [
+        "libcomputepipeprotos",
         "computepipe_runner_component",
+        "computepipe_input_manager",
+        "libcomputepipe_stream_manager",
+    ],
+    shared_libs: [
+        "computepipe_client_interface",
+        "computepipe_prebuilt_graph",
+        "libprotobuf-cpp-lite",
+        "android.hardware.automotive.evs@1.0",
+        "libbase",
+        "libcutils",
+        "libdl",
+        "libevssupport",
+        "libhardware",
+        "libhidlbase",
+        "liblog",
+        "libpng",
+        "libprotobuf-cpp-lite",
+        "libui",
+        "libutils",
+        "libEGL",
+        "libGLESv2",
+    ],
+    cflags: [
+        "-D_LIBCPP_ENABLE_THREAD_SAFETY_ANNOTATIONS",
+        "-Wall",
+        "-Werror",
+        "-Wunused",
+        "-Wunreachable-code",
+        "-Wthread-safety",
     ],
     include_dirs: [
         "packages/services/Car/computepipe",
diff --git a/computepipe/runner/engine/ConfigBuilder.cpp b/computepipe/runner/engine/ConfigBuilder.cpp
index 38ed417..99a3b6b 100644
--- a/computepipe/runner/engine/ConfigBuilder.cpp
+++ b/computepipe/runner/engine/ConfigBuilder.cpp
@@ -20,12 +20,24 @@
 namespace runner {
 namespace engine {
 
+void ConfigBuilder::setDebugDisplayStream(int id) {
+    mDisplayStream = id;
+    mOutputConfig.emplace(id, 1);
+}
+
+bool ConfigBuilder::clientConfigEnablesDisplayStream() {
+    return mConfigHasDisplayStream;
+}
+
 ConfigBuilder& ConfigBuilder::updateInputConfigOption(int id) {
     mInputConfigId = id;
     return *this;
 }
 
 ConfigBuilder& ConfigBuilder::updateOutputStreamOption(int id, int maxInFlightPackets) {
+    if (id == mDisplayStream) {
+        mConfigHasDisplayStream = true;
+    }
     mOutputConfig.emplace(id, maxInFlightPackets);
     return *this;
 }
@@ -54,6 +66,7 @@
     mTerminationId = ClientConfig::kInvalidId;
     mOffloadId = ClientConfig::kInvalidId;
     mOutputConfig.clear();
+    mConfigHasDisplayStream = false;
     return *this;
 }
 
diff --git a/computepipe/runner/engine/ConfigBuilder.h b/computepipe/runner/engine/ConfigBuilder.h
index be0ac87..7937c1d 100644
--- a/computepipe/runner/engine/ConfigBuilder.h
+++ b/computepipe/runner/engine/ConfigBuilder.h
@@ -27,6 +27,14 @@
 class ConfigBuilder {
   public:
     /**
+     * Set debug display stream in the final client config
+     */
+    void setDebugDisplayStream(int id);
+    /**
+     * Does client explicitly enable display stream
+     */
+    bool clientConfigEnablesDisplayStream();
+    /**
      * Update current input option
      */
     ConfigBuilder& updateInputConfigOption(int id);
@@ -56,9 +64,11 @@
     ConfigBuilder& reset();
 
   private:
+    int mDisplayStream;
     int mInputConfigId;
     int mOffloadId;
     int mTerminationId;
+    bool mConfigHasDisplayStream = false;
     std::map<int, int> mOutputConfig;
     std::string mOptionalConfig;
 };
diff --git a/computepipe/runner/engine/DefaultEngine.cpp b/computepipe/runner/engine/DefaultEngine.cpp
new file mode 100644
index 0000000..92eabf2
--- /dev/null
+++ b/computepipe/runner/engine/DefaultEngine.cpp
@@ -0,0 +1,621 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "DefaultEngine.h"
+
+#include <android-base/logging.h>
+
+#include <algorithm>
+#include <cassert>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "ClientInterface.h"
+#include "EventGenerator.h"
+#include "InputFrame.h"
+#include "PrebuiltGraph.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+using android::automotive::computepipe::graph::PrebuiltGraph;
+using android::automotive::computepipe::runner::client_interface::ClientInterface;
+using android::automotive::computepipe::runner::generator::DefaultEvent;
+using android::automotive::computepipe::runner::input_manager::InputEngineInterface;
+using android::automotive::computepipe::runner::stream_manager::StreamEngineInterface;
+using android::automotive::computepipe::runner::stream_manager::StreamManager;
+
+namespace {
+
+int getStreamIdFromSource(std::string source) {
+    auto pos = source.find(":");
+    return std::stoi(source.substr(pos + 1));
+}
+}  // namespace
+
+void DefaultEngine::setClientInterface(std::unique_ptr<ClientInterface>&& client) {
+    mClient = std::move(client);
+}
+
+void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
+    mGraph = std::move(graph);
+    mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
+}
+
+Status DefaultEngine::setArgs(std::string engine_args) {
+    auto pos = engine_args.find(kNoInputManager);
+    if (pos != std::string::npos) {
+        mIgnoreInputManager = true;
+    }
+    pos = engine_args.find(kDisplayStreamId);
+    if (pos == std::string::npos) {
+        return Status::SUCCESS;
+    }
+    mDisplayStream = std::stoi(engine_args.substr(pos + strlen(kDisplayStreamId)));
+    mConfigBuilder.setDebugDisplayStream(mDisplayStream);
+    return Status::SUCCESS;
+}
+
+Status DefaultEngine::activate() {
+    mConfigBuilder.reset();
+    mEngineThread = std::make_unique<std::thread>(&DefaultEngine::processCommands, this);
+    return mClient->activate();
+}
+
+Status DefaultEngine::processClientConfigUpdate(const proto::ConfigurationCommand& command) {
+    // TODO check current phase
+    std::lock_guard<std::mutex> lock(mEngineLock);
+    if (mCurrentPhase != kResetPhase) {
+        return Status::ILLEGAL_STATE;
+    }
+    if (command.has_set_input_source()) {
+        mConfigBuilder =
+            mConfigBuilder.updateInputConfigOption(command.set_input_source().source_id());
+    } else if (command.has_set_termination_option()) {
+        mConfigBuilder = mConfigBuilder.updateTerminationOption(
+            command.set_termination_option().termination_option_id());
+    } else if (command.has_set_output_stream()) {
+        mConfigBuilder = mConfigBuilder.updateOutputStreamOption(
+            command.set_output_stream().stream_id(),
+            command.set_output_stream().max_inflight_packets_count());
+    } else if (command.has_set_offload_offload()) {
+        mConfigBuilder =
+            mConfigBuilder.updateOffloadOption(command.set_offload_offload().offload_option_id());
+    } else {
+        return SUCCESS;
+    }
+    return Status::SUCCESS;
+}
+
+Status DefaultEngine::processClientCommand(const proto::ControlCommand& command) {
+    // TODO check current phase
+    std::lock_guard<std::mutex> lock(mEngineLock);
+
+    if (command.has_apply_configs()) {
+        if (mCurrentPhase != kResetPhase) {
+            return Status::ILLEGAL_STATE;
+        }
+        queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_CONFIG);
+        return Status::SUCCESS;
+    }
+    if (command.has_start_graph()) {
+        if (mCurrentPhase != kConfigPhase) {
+            return Status::ILLEGAL_STATE;
+        }
+        queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_START_RUN);
+        return Status::SUCCESS;
+    }
+    if (command.has_stop_graph()) {
+        if (mCurrentPhase != kRunPhase) {
+            return Status::ILLEGAL_STATE;
+        }
+        mStopFromClient = true;
+        queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_INITIATE_STOP);
+        return Status::SUCCESS;
+    }
+    if (command.has_death_notification()) {
+        mErrorQueue.push(ComponentError("ClientInterface", "Client death", mCurrentPhase, false));
+        mWakeLooper.notify_all();
+        return Status::SUCCESS;
+    }
+    return Status::SUCCESS;
+}
+
+Status DefaultEngine::freePacket(const std::shared_ptr<MemHandle>& packet) {
+    int streamId = packet->getStreamId();
+    mStreamManagers[streamId]->freePacket(packet);
+    return Status::SUCCESS;
+}
+
+/**
+ * Methods from PrebuiltEngineInterface
+ */
+void DefaultEngine::DispatchPixelData(int /* streamId */, int64_t /* timestamp */,
+                                      const uint8_t* /* pixels */, int /* width */,
+                                      int /* height */, int /* step */, PixelFormat /* format*/) {
+    // TODO: b/147975150 Add pixel stream forwarding to stream manager.
+    return;
+}
+
+void DefaultEngine::DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) {
+    LOG(INFO) << "Engine::Received data for stream  " << streamId << " with timestamp " << timestamp;
+    if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
+        LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
+    }
+    std::string data(output);
+    mStreamManagers[streamId]->queuePacket(data.c_str(), data.size(), timestamp);
+}
+
+void DefaultEngine::DispatchGraphTerminationMessage(Status s, std::string&& msg) {
+    std::lock_guard<std::mutex> lock(mEngineLock);
+    if (s == SUCCESS) {
+        if (mCurrentPhase == kRunPhase) {
+            queueCommand("PrebuiltGraph", EngineCommand::Type::BROADCAST_INITIATE_STOP);
+        } else {
+            LOG(WARNING) << "Graph termination when not in run phase";
+        }
+    } else {
+        std::string error = msg;
+        queueError("PrebuiltGraph", error, false);
+    }
+}
+
+Status DefaultEngine::broadcastClientConfig() {
+    ClientConfig config = mConfigBuilder.emitClientOptions();
+
+    LOG(INFO) << "Engine::create stream manager";
+    Status ret = populateStreamManagers(config);
+    if (ret != Status::SUCCESS) {
+        return ret;
+    }
+
+    if (mGraph) {
+        ret = populateInputManagers(config);
+        if (ret != Status::SUCCESS) {
+            abortClientConfig(config);
+            return ret;
+        }
+
+        LOG(INFO) << "Engine::send client config entry to graph";
+        config.setPhaseState(PhaseState::ENTRY);
+        ret = mGraph->handleConfigPhase(config);
+        if (ret != Status::SUCCESS) {
+            abortClientConfig(config);
+            return ret;
+        }
+        LOG(INFO) << "Engine::send client config transition complete to graph";
+        config.setPhaseState(PhaseState::TRANSITION_COMPLETE);
+        ret = mGraph->handleConfigPhase(config);
+        if (ret != Status::SUCCESS) {
+            abortClientConfig(config);
+            return ret;
+        }
+    }
+    LOG(INFO) << "Engine::Graph configured";
+    // TODO add handling for remote graph
+    ret = mClient->handleConfigPhase(config);
+    if (ret != Status::SUCCESS) {
+        config.setPhaseState(PhaseState::ABORTED);
+        abortClientConfig(config, true);
+        return ret;
+    }
+    mCurrentPhase = kConfigPhase;
+    return Status::SUCCESS;
+}
+
+void DefaultEngine::abortClientConfig(const ClientConfig& config, bool resetGraph) {
+    mStreamManagers.clear();
+    mInputManagers.clear();
+    if (resetGraph && mGraph) {
+        (void)mGraph->handleConfigPhase(config);
+    }
+    (void)mClient->handleConfigPhase(config);
+    // TODO add handling for remote graph
+}
+
+Status DefaultEngine::broadcastStartRun() {
+    DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RUN);
+
+    std::vector<int> successfulStreams;
+    std::vector<int> successfulInputs;
+    for (auto& it : mStreamManagers) {
+        LOG(INFO) << "Engine::sending start run to stream manager " << it.first << " failed";
+        if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
+            LOG(ERROR) << "Engine::failure to enter run phase for stream " << it.first;
+            broadcastAbortRun(successfulStreams, successfulInputs);
+            return Status::INTERNAL_ERROR;
+        }
+        successfulStreams.push_back(it.first);
+    }
+    // TODO: send to remote
+    Status ret;
+    if (mGraph) {
+        LOG(INFO) << "Engine::sending start run to prebuilt";
+        ret = mGraph->handleExecutionPhase(runEvent);
+        if (ret != Status::SUCCESS) {
+            broadcastAbortRun(successfulStreams, successfulInputs);
+        }
+        for (auto& it : mInputManagers) {
+            if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
+                LOG(ERROR) << "Engine::failure to enter run phase for input manager " << it.first;
+                broadcastAbortRun(successfulStreams, successfulInputs, true);
+                return Status::INTERNAL_ERROR;
+            }
+            successfulInputs.push_back(it.first);
+        }
+    }
+    runEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RUN);
+    LOG(INFO) << "Engine::sending run transition complete to client";
+    ret = mClient->handleExecutionPhase(runEvent);
+    if (ret != Status::SUCCESS) {
+        LOG(ERROR) << "Engine::client failure to acknowledge transition to run complete ";
+        broadcastAbortRun(successfulStreams, successfulInputs, true);
+        return ret;
+    }
+    for (auto& it : mStreamManagers) {
+        (void)it.second->handleExecutionPhase(runEvent);
+    }
+    // TODO: send to remote
+    if (mGraph) {
+        LOG(INFO) << "Engine::sending run transition complete to prebuilt";
+        (void)mGraph->handleExecutionPhase(runEvent);
+        for (auto& it : mInputManagers) {
+            (void)it.second->handleExecutionPhase(runEvent);
+        }
+    }
+    LOG(INFO) << "Engine::Running";
+    mCurrentPhase = kRunPhase;
+    return Status::SUCCESS;
+}
+
+void DefaultEngine::broadcastAbortRun(const std::vector<int>& streamIds,
+                                      const std::vector<int>& inputIds, bool abortGraph) {
+    DefaultEvent runEvent = DefaultEvent::generateAbortEvent(DefaultEvent::RUN);
+    std::for_each(streamIds.begin(), streamIds.end(), [this, runEvent](int id) {
+        (void)this->mStreamManagers[id]->handleExecutionPhase(runEvent);
+    });
+    std::for_each(inputIds.begin(), inputIds.end(), [this, runEvent](int id) {
+        (void)this->mInputManagers[id]->handleExecutionPhase(runEvent);
+    });
+    if (abortGraph) {
+        if (mGraph) {
+            (void)mGraph->handleExecutionPhase(runEvent);
+        }
+    }
+}
+
+Status DefaultEngine::broadcastStopWithFlush() {
+    DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_WITH_FLUSH);
+
+    if (mGraph) {
+        for (auto& it : mInputManagers) {
+            (void)it.second->handleStopWithFlushPhase(runEvent);
+        }
+        if (mStopFromClient) {
+            (void)mGraph->handleStopWithFlushPhase(runEvent);
+        }
+    }
+    // TODO: send to remote.
+    for (auto& it : mStreamManagers) {
+        (void)it.second->handleStopWithFlushPhase(runEvent);
+    }
+    if (!mStopFromClient) {
+        (void)mClient->handleStopWithFlushPhase(runEvent);
+    }
+    mCurrentPhase = kStopPhase;
+    return Status::SUCCESS;
+}
+
+Status DefaultEngine::broadcastStopComplete() {
+    DefaultEvent runEvent =
+        DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_WITH_FLUSH);
+    if (mGraph) {
+        for (auto& it : mInputManagers) {
+            (void)it.second->handleStopWithFlushPhase(runEvent);
+        }
+        (void)mGraph->handleStopWithFlushPhase(runEvent);
+    }
+    // TODO: send to remote.
+    for (auto& it : mStreamManagers) {
+        (void)it.second->handleStopWithFlushPhase(runEvent);
+    }
+    (void)mClient->handleStopWithFlushPhase(runEvent);
+    mCurrentPhase = kConfigPhase;
+    return Status::SUCCESS;
+}
+
+void DefaultEngine::broadcastHalt() {
+    DefaultEvent stopEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_IMMEDIATE);
+
+    if (mGraph) {
+        for (auto& it : mInputManagers) {
+            (void)it.second->handleStopImmediatePhase(stopEvent);
+        }
+
+        if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos)) {
+            (void)mGraph->handleStopImmediatePhase(stopEvent);
+        }
+    }
+    // TODO: send to remote if client was source.
+    for (auto& it : mStreamManagers) {
+        (void)it.second->handleStopImmediatePhase(stopEvent);
+    }
+    if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
+        (void)mClient->handleStopImmediatePhase(stopEvent);
+    }
+
+    stopEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_IMMEDIATE);
+    if (mGraph) {
+        for (auto& it : mInputManagers) {
+            (void)it.second->handleStopImmediatePhase(stopEvent);
+        }
+        // TODO: send to graph or remote if client was source.
+
+        if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos) && mGraph) {
+            (void)mGraph->handleStopImmediatePhase(stopEvent);
+        }
+    }
+    for (auto& it : mStreamManagers) {
+        (void)it.second->handleStopImmediatePhase(stopEvent);
+    }
+    if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
+        (void)mClient->handleStopImmediatePhase(stopEvent);
+    }
+    mCurrentPhase = kConfigPhase;
+}
+
+void DefaultEngine::broadcastReset() {
+    mStreamManagers.clear();
+    mInputManagers.clear();
+    DefaultEvent resetEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RESET);
+    (void)mClient->handleResetPhase(resetEvent);
+    if (mGraph) {
+        (void)mGraph->handleResetPhase(resetEvent);
+    }
+    resetEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RESET);
+    (void)mClient->handleResetPhase(resetEvent);
+    if (mGraph) {
+        (void)mGraph->handleResetPhase(resetEvent);
+    }
+    // TODO: send to remote runner
+    mConfigBuilder.reset();
+    mCurrentPhase = kResetPhase;
+    mStopFromClient = false;
+}
+
+Status DefaultEngine::populateStreamManagers(const ClientConfig& config) {
+    std::map<int, int> outputConfigs;
+    if (config.getOutputStreamConfigs(outputConfigs) != Status::SUCCESS) {
+        return Status::ILLEGAL_STATE;
+    }
+    for (auto& configIt : outputConfigs) {
+        int streamId = configIt.first;
+        int maxInFlightPackets = configIt.second;
+        proto::OutputConfig outputDescriptor;
+        // find the output descriptor for requested stream id
+        bool foundDesc = false;
+        for (auto& optionIt : mGraphDescriptor.output_configs()) {
+            if (optionIt.stream_id() == streamId) {
+                outputDescriptor = optionIt;
+                foundDesc = true;
+                break;
+            }
+        }
+        if (!foundDesc) {
+            LOG(ERROR) << "no matching output config for requested id " << streamId;
+            return Status::INVALID_ARGUMENT;
+        }
+        std::function<Status(std::shared_ptr<MemHandle>)> packetCb =
+            [this, streamId](std::shared_ptr<MemHandle> handle) -> Status {
+            return this->forwardOutputDataToClient(streamId, handle);
+        };
+
+        std::function<void(std::string)> errorCb = [this, streamId](std::string m) {
+            std::string source = "StreamManager:" + std::to_string(streamId) + " : " + m;
+            this->queueError(source, m, false);
+        };
+
+        std::function<void()> eos = [this, streamId]() {
+            std::string source = "StreamManager:" + std::to_string(streamId);
+            std::lock_guard<std::mutex> lock(this->mEngineLock);
+            this->queueCommand(source, EngineCommand::Type::POLL_COMPLETE);
+        };
+
+        std::shared_ptr<StreamEngineInterface> engine = std::make_shared<StreamCallback>(
+            std::move(eos), std::move(errorCb), std::move(packetCb));
+        mStreamManagers.emplace(configIt.first, mStreamFactory.getStreamManager(
+                                                    outputDescriptor, engine, maxInFlightPackets));
+        if (mStreamManagers[streamId] == nullptr) {
+            LOG(ERROR) << "unable to create stream manager for stream " << streamId;
+            return Status::INTERNAL_ERROR;
+        }
+    }
+    return Status::SUCCESS;
+}
+
+Status DefaultEngine::forwardOutputDataToClient(int streamId,
+                                                std::shared_ptr<MemHandle>& dataHandle) {
+    if (streamId == mDisplayStream) {
+        // TODO: dispatch to display
+        if (mConfigBuilder.clientConfigEnablesDisplayStream()) {
+            return mClient->dispatchPacketToClient(streamId, dataHandle);
+        }
+    }
+    return mClient->dispatchPacketToClient(streamId, dataHandle);
+}
+
+Status DefaultEngine::populateInputManagers(const ClientConfig& config) {
+    if (mIgnoreInputManager) {
+        return Status::SUCCESS;
+    }
+    proto::InputConfig inputDescriptor;
+    int selectedId;
+
+    if (config.getInputConfigId(&selectedId) != Status::SUCCESS) {
+        return Status::INVALID_ARGUMENT;
+    }
+
+    for (auto& inputIt : mGraphDescriptor.input_configs()) {
+        if (selectedId == inputIt.config_id()) {
+            inputDescriptor = inputIt;
+            std::shared_ptr<InputCallback> cb = std::make_shared<InputCallback>(
+                selectedId,
+                [this](int id) {
+                    std::string source = "InputManager:" + std::to_string(id);
+                    this->queueError(source, "", false);
+                },
+                [](int /*streamId */, const InputFrame& /* frame */) { return Status::SUCCESS; });
+            mInputManagers.emplace(selectedId,
+                                   mInputFactory.createInputManager(inputDescriptor, cb));
+            if (mInputManagers[selectedId] == nullptr) {
+                LOG(ERROR) << "unable to create input manager for stream " << selectedId;
+                // TODO: Add print
+                return Status::INTERNAL_ERROR;
+            }
+            return Status::SUCCESS;
+        }
+    }
+    return Status::INVALID_ARGUMENT;
+}
+
+/**
+ * Engine Command Queue and Error Queue handling
+ */
+void DefaultEngine::processCommands() {
+    std::unique_lock<std::mutex> lock(mEngineLock);
+    while (1) {
+        LOG(INFO) << "Engine::Waiting on commands ";
+        mWakeLooper.wait(lock, [this] {
+            if (this->mCommandQueue.empty() && !mCurrentPhaseError) {
+                return false;
+            } else {
+                return true;
+            }
+        });
+        if (mCurrentPhaseError) {
+            mErrorQueue.push(*mCurrentPhaseError);
+
+            processComponentError(mCurrentPhaseError->source);
+            mCurrentPhaseError = nullptr;
+            std::queue<EngineCommand> empty;
+            std::swap(mCommandQueue, empty);
+            continue;
+        }
+        EngineCommand ec = mCommandQueue.front();
+        mCommandQueue.pop();
+        switch (ec.cmdType) {
+            case EngineCommand::Type::BROADCAST_CONFIG:
+                LOG(INFO) << "Engine::Received broacast config request";
+                (void)broadcastClientConfig();
+                break;
+            case EngineCommand::Type::BROADCAST_START_RUN:
+                LOG(INFO) << "Engine::Received broacast run request";
+                (void)broadcastStartRun();
+                break;
+            case EngineCommand::Type::BROADCAST_INITIATE_STOP:
+                if (ec.source.find("ClientInterface") != std::string::npos) {
+                    mStopFromClient = true;
+                }
+                LOG(INFO) << "Engine::Received broacast stop with flush request";
+                broadcastStopWithFlush();
+                break;
+            case EngineCommand::Type::POLL_COMPLETE:
+                LOG(INFO) << "Engine::Received Poll stream managers for completion request";
+                int id = getStreamIdFromSource(ec.source);
+                bool all_done = true;
+                for (auto& it : mStreamManagers) {
+                    if (it.first == id) {
+                        continue;
+                    }
+                    if (it.second->getState() != StreamManager::State::STOPPED) {
+                        all_done = false;
+                    }
+                }
+                if (all_done) {
+                    broadcastStopComplete();
+                }
+                break;
+        }
+    }
+}
+
+void DefaultEngine::processComponentError(std::string source) {
+    if (mCurrentPhase == kRunPhase || mCurrentPhase == kStopPhase) {
+        (void)broadcastHalt();
+    }
+    if (source.find("ClientInterface") != std::string::npos) {
+        (void)broadcastReset();
+    }
+}
+
+void DefaultEngine::queueCommand(std::string source, EngineCommand::Type type) {
+    mCommandQueue.push(EngineCommand(source, type));
+    mWakeLooper.notify_all();
+}
+
+void DefaultEngine::queueError(std::string source, std::string msg, bool fatal) {
+    std::lock_guard<std::mutex> lock(mEngineLock);
+    // current phase already has an error report
+    if (!mCurrentPhaseError) {
+        mCurrentPhaseError = std::make_unique<ComponentError>(source, msg, mCurrentPhase, fatal);
+        mWakeLooper.notify_all();
+    }
+}
+
+/**
+ * InputCallback implementation
+ */
+InputCallback::InputCallback(int id, const std::function<void(int)>&& cb,
+                             const std::function<Status(int, const InputFrame&)>&& packetCb)
+    : mErrorCallback(cb), mPacketHandler(packetCb), mInputId(id) {
+}
+
+Status InputCallback::dispatchInputFrame(int streamId, const InputFrame& frame) {
+    return mPacketHandler(streamId, frame);
+}
+void InputCallback::notifyInputError() {
+    mErrorCallback(mInputId);
+}
+
+/**
+ * StreamCallback implementation
+ */
+StreamCallback::StreamCallback(
+    const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
+    const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler)
+    : mErrorHandler(errorCb), mEndOfStreamHandler(eos), mPacketHandler(packetHandler) {
+}
+
+void StreamCallback::notifyError(std::string msg) {
+    mErrorHandler(msg);
+}
+
+void StreamCallback::notifyEndOfStream() {
+    mEndOfStreamHandler();
+}
+
+Status StreamCallback::dispatchPacket(const std::shared_ptr<MemHandle>& packet) {
+    return mPacketHandler(packet);
+}
+
+}  // namespace engine
+}  // namespace runner
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
diff --git a/computepipe/runner/engine/DefaultEngine.h b/computepipe/runner/engine/DefaultEngine.h
new file mode 100644
index 0000000..5576914
--- /dev/null
+++ b/computepipe/runner/engine/DefaultEngine.h
@@ -0,0 +1,327 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_ENGINE_DEFAULT_H
+#define COMPUTEPIPE_RUNNER_ENGINE_DEFAULT_H
+
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include "ConfigBuilder.h"
+#include "InputManager.h"
+#include "Options.pb.h"
+#include "RunnerEngine.h"
+#include "StreamManager.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+class InputCallback;
+
+/**
+ * EngineCommand represents client requests or error events.
+ * Each command is queued, and processed by the engine thread.
+ */
+struct EngineCommand {
+  public:
+    enum Type {
+        BROADCAST_CONFIG = 0,
+        BROADCAST_START_RUN,
+        BROADCAST_INITIATE_STOP,
+        POLL_COMPLETE,
+    };
+    std::string source;
+    Type cmdType;
+    explicit EngineCommand(std::string s, Type t) : source(s), cmdType(t) {
+    }
+};
+
+/**
+ * Component Error represents the type of error reported by a component.
+ */
+struct ComponentError {
+  public:
+    bool isFatal;
+    std::string source;
+    std::string message;
+    std::string currentPhase;
+    explicit ComponentError(std::string s, std::string m, std::string p, bool fatal = false)
+        : isFatal(fatal), source(s), message(m), currentPhase(p) {
+    }
+};
+
+/**
+ * Default Engine implementation.
+ * Takes ownership of externally instantiated graph & client interface
+ * instances. Brings the runner online. Manages components.
+ * Responds to client events.
+ */
+class DefaultEngine : public RunnerEngine {
+  public:
+    static constexpr char kDisplayStreamId[] = "display_stream:";
+    static constexpr char kNoInputManager[] = "no_input_manager";
+    static constexpr char kResetPhase[] = "Reset";
+    static constexpr char kConfigPhase[] = "Config";
+    static constexpr char kRunPhase[] = "Running";
+    static constexpr char kStopPhase[] = "Stopping";
+    /**
+     * Methods from Runner Engine to override
+     */
+    Status setArgs(std::string engine_args) override;
+    void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) override;
+    void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) override;
+    Status activate() override;
+    /**
+     * Methods from ClientEngineInterface to override
+     */
+    Status processClientConfigUpdate(const proto::ConfigurationCommand& command) override;
+    Status processClientCommand(const proto::ControlCommand& command) override;
+    Status freePacket(const std::shared_ptr<MemHandle>& packet) override;
+    /**
+     * Methods from PrebuiltEngineInterface to override
+     */
+    void DispatchPixelData(int streamId, int64_t timestamp, const uint8_t* pixels, int width,
+                           int height, int step, PixelFormat format) override;
+
+    void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) override;
+
+    void DispatchGraphTerminationMessage(Status s, std::string&& msg) override;
+
+  private:
+    // TODO: b/147704051 Add thread analyzer annotations
+    /**
+     * BroadCast Client config to all components. If all components handle the
+     * notification correctly, then broadcast transition complete.
+     * Successful return from this function implies runner has transitioned to
+     * configuration done.
+     * @Lock held mEngineLock
+     */
+    Status broadcastClientConfig();
+    /**
+     * Abort an ongoing attempt to apply client configs.
+     * @Lock held mEngineLock
+     */
+    void abortClientConfig(const ClientConfig& config, bool resetGraph = false);
+    /**
+     * BroadCast start to all components. The order of entry into run phase
+     * notification delivery is downstream components to upstream components.
+     * If all components handle the entry notification correctly then broadcast
+     * transition complete notification again from down stream to upstream.
+     * Successful return from this function implies runner has transitioned to
+     * running.
+     * @Lock held mEngineLock
+     */
+    Status broadcastStartRun();
+    /**
+     * BroadCast abort of started run to given components. This gets called if during
+     * broadcastStartRun(), one of the components failed to set itself up for the run. In that case
+     * the components that had successfully acknowledged broacastStartRun(),
+     * need to be told to abort. We transition back to config phase at the end
+     * of this call.
+     * @Lock held mEngineLock
+     */
+    void broadcastAbortRun(const std::vector<int>& streamIds, const std::vector<int>& inputIds,
+                           bool graph = false);
+
+    /**
+     * Broadcast stop with flush to all components. The stop with flush phase
+     * entry notification is sent from in the order of upstream to downstream.
+     * A successful return can leave the runner in stopping phase.
+     * We transition to stop completely, once all inflight traffic has been drained at a later
+     * point, identified by stream managers.
+     * @Lock held mEngineLock
+     */
+    Status broadcastStopWithFlush();
+    /**
+     * Broadcast transtion to stop complete. This is a confirmation to all
+     * components that stop has finished. At the end of this we transition back
+     * to config phase.
+     * @Lock held mEngineLock
+     */
+    Status broadcastStopComplete();
+    /**
+     * Broadcast halt to all components. All inflight traffic is dropped.
+     * Successful return from this function implies all components have
+     * exited run phase and are back in config phase.
+     * @Lock held mEngineLock
+     */
+    void broadcastHalt();
+    /**
+     * Broadcast reset to all components. All components drop client
+     * specific configuration and transition to reset state. For RAII
+     * components, they are freed at this point. ALso resets the mConfigBuilder
+     * to its original state. Successful return puts the runner in reset phase.
+     * @Lock held mEngineLock
+     */
+    void broadcastReset();
+    /**
+     * Populate stream managers for a given client config. For each client
+     * selected output config, we generate stream managers. During reset phase
+     * we clear out any previously constructed stream managers. This should be
+     * invoked only in response to applyConfigs() issued by client.
+     * @Lock held mEngineLock
+     */
+    Status populateStreamManagers(const ClientConfig& config);
+    /**
+     * Populate input managers for a given client config. For each client
+     * selected output config, we generate input managers. During reset phase
+     * we clear out any previously constructed input managers. This should be
+     * invoked only in response to applyConfigs() issued by client.
+     * @Lock held mEngineLock
+     */
+    Status populateInputManagers(const ClientConfig& config);
+    /**
+     * Helper method to forward packet to client interface for transmission
+     */
+    Status forwardOutputDataToClient(int streamId, std::shared_ptr<MemHandle>& handle);
+    /**
+     * Helper to handle error notification from components, in the errorQueue.
+     * In case the source of the error is client interface, it will
+     * broadcastReset().
+     * This called in the mEngineThread when processing an entry from the
+     * errorQueue,
+     * @Lock acquires mEngineLock.
+     */
+    void processComponentError(std::string source);
+    /**
+     * Method run by the engine thread to process commands.
+     * Uses condition variable. Acquires lock mEngineLock to access
+     * command queues.
+     */
+    void processCommands();
+    /**
+     * Method run by external components to queue commands to the engine.
+     * Must be called with mEngineLock held. Wakes up the looper.
+     */
+    void queueCommand(std::string source, EngineCommand::Type type);
+    /**
+     * Method called by component reporting error.
+     * This will acquire mEngineLock and queue the error.
+     */
+    void queueError(std::string source, std::string msg, bool fatal);
+    /**
+     * client interface handle
+     */
+    std::unique_ptr<client_interface::ClientInterface> mClient = nullptr;
+    /**
+     * builder to build up client config incrementally.
+     */
+    ConfigBuilder mConfigBuilder;
+    /**
+     * Stream management members
+     */
+    std::map<int, std::unique_ptr<stream_manager::StreamManager>> mStreamManagers;
+    stream_manager::StreamManagerFactory mStreamFactory;
+    /**
+     * Input manager members
+     */
+    std::map<int, std::unique_ptr<input_manager::InputManager>> mInputManagers;
+    input_manager::InputManagerFactory mInputFactory;
+    /**
+     * stream to dump to display for debug purposes
+     */
+    int32_t mDisplayStream = ClientConfig::kInvalidId;
+    /**
+     * graph descriptor
+     */
+    proto::Options mGraphDescriptor;
+    std::unique_ptr<graph::PrebuiltGraph> mGraph;
+    /**
+     * stop signal source
+     */
+    bool mStopFromClient = true;
+    /**
+     * Phase management members
+     */
+    std::string mCurrentPhase = kResetPhase;
+    std::mutex mEngineLock;
+    /**
+     * Used to track the first error occurrence for a given phase.
+     */
+    std::unique_ptr<ComponentError> mCurrentPhaseError = nullptr;
+
+    /**
+     * Queue for client commands
+     */
+    std::queue<EngineCommand> mCommandQueue;
+    /**
+     * Queue for error notifications
+     */
+    std::queue<ComponentError> mErrorQueue;
+    /**
+     * Engine looper
+     */
+    std::unique_ptr<std::thread> mEngineThread;
+    /**
+     * Condition variable for looper
+     */
+    std::condition_variable mWakeLooper;
+    /**
+     * ignore input manager allocation
+     */
+    bool mIgnoreInputManager = false;
+};
+
+/**
+ * Handles callbacks from individual stream managers as specified in the
+ * StreamEngineInterface.
+ */
+class StreamCallback : public stream_manager::StreamEngineInterface {
+  public:
+    explicit StreamCallback(
+        const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
+        const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler);
+    void notifyEndOfStream() override;
+    void notifyError(std::string msg) override;
+    Status dispatchPacket(const std::shared_ptr<MemHandle>& outData) override;
+    ~StreamCallback() = default;
+
+  private:
+    std::function<void(std::string)> mErrorHandler;
+    std::function<void()> mEndOfStreamHandler;
+    std::function<Status(const std::shared_ptr<MemHandle>&)> mPacketHandler;
+};
+
+/**
+ * Handles callbacks from input managers. Forwards frames to the graph.
+ * Only used if graph implementation is local
+ */
+class InputCallback : public input_manager::InputEngineInterface {
+  public:
+    explicit InputCallback(int id, const std::function<void(int)>&& cb,
+                           const std::function<Status(int, const InputFrame&)>&& packetCb);
+    Status dispatchInputFrame(int streamId, const InputFrame& frame) override;
+    void notifyInputError() override;
+    ~InputCallback() = default;
+
+  private:
+    std::function<void(int)> mErrorCallback;
+    std::function<Status(int, const InputFrame&)> mPacketHandler;
+    int mInputId;
+};
+
+}  // namespace engine
+}  // namespace runner
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
+
+#endif
diff --git a/computepipe/runner/engine/Factory.cpp b/computepipe/runner/engine/Factory.cpp
new file mode 100644
index 0000000..67b3cd3
--- /dev/null
+++ b/computepipe/runner/engine/Factory.cpp
@@ -0,0 +1,46 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "DefaultEngine.h"
+#include "RunnerEngine.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+namespace {
+std::unique_ptr<DefaultEngine> createDefaultEngine(std::string engine_args) {
+    std::unique_ptr<DefaultEngine> engine = std::make_unique<DefaultEngine>();
+    if (engine->setArgs(engine_args) != Status::SUCCESS) {
+        return nullptr;
+    }
+    return engine;
+}
+}  // namespace
+
+std::unique_ptr<RunnerEngine> RunnerEngineFactory::createRunnerEngine(std::string engine,
+                                                                      std::string engine_args) {
+    if (engine == kDefault) {
+        return createDefaultEngine(engine_args);
+    }
+    return nullptr;
+}
+
+}  // namespace engine
+}  // namespace runner
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
diff --git a/computepipe/runner/engine/include/RunnerEngine.h b/computepipe/runner/engine/include/RunnerEngine.h
new file mode 100644
index 0000000..778099d
--- /dev/null
+++ b/computepipe/runner/engine/include/RunnerEngine.h
@@ -0,0 +1,71 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_ENGINE_H
+#define COMPUTEPIPE_RUNNER_ENGINE_H
+
+#include <memory>
+#include <string>
+
+#include "ClientInterface.h"
+#include "PrebuiltGraph.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace runner {
+namespace engine {
+
+/**
+ * Class that offers an interface into an engine. It derives from the client,
+ * prebuilt -> engine interfaces to enforce that any instantiation of an engine
+ * needs to provide an implementation for those interfaces.
+ */
+class RunnerEngine : public client_interface::ClientEngineInterface,
+                     public graph::PrebuiltEngineInterface {
+  public:
+    /**
+     * Any args that a given engine instance needs in order to configure itself.
+     */
+    virtual Status setArgs(std::string engine_args) = 0;
+    /**
+     * Set the client and the prebuilt graph instances
+     */
+    virtual void setClientInterface(std::unique_ptr<client_interface::ClientInterface>&& client) = 0;
+
+    virtual void setPrebuiltGraph(std::unique_ptr<graph::PrebuiltGraph>&& graph) = 0;
+    /**
+     * Activates the client interface and advertises to the rest of the world
+     * that the runner is online
+     */
+    virtual Status activate() = 0;
+};
+
+class RunnerEngineFactory {
+  public:
+    static constexpr char kDefault[] = "default_engine";
+    std::unique_ptr<RunnerEngine> createRunnerEngine(std::string engine, std::string engine_args);
+    RunnerEngineFactory(const RunnerEngineFactory&) = delete;
+    RunnerEngineFactory& operator=(const RunnerEngineFactory&) = delete;
+    RunnerEngineFactory() = default;
+};
+
+}  // namespace engine
+}  // namespace runner
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
+
+#endif
diff --git a/computepipe/runner/graph/include/PrebuiltGraph.h b/computepipe/runner/graph/include/PrebuiltGraph.h
index f96a8ad..66c069b 100644
--- a/computepipe/runner/graph/include/PrebuiltGraph.h
+++ b/computepipe/runner/graph/include/PrebuiltGraph.h
@@ -51,8 +51,6 @@
     // No copy or move constructors or operators are available.
     PrebuiltGraph(const PrebuiltGraph&) = delete;
     PrebuiltGraph& operator=(const PrebuiltGraph&) = delete;
-    PrebuiltGraph(PrebuiltGraph&&) = delete;
-    PrebuiltGraph& operator=(PrebuiltGraph&&) = delete;
 
     // Override RunnerComponent interface functions for applying configs,
     // starting the graph and stopping the graph.
diff --git a/computepipe/runner/stream_manager/SemanticManager.cpp b/computepipe/runner/stream_manager/SemanticManager.cpp
index fc73c74..abb4093 100644
--- a/computepipe/runner/stream_manager/SemanticManager.cpp
+++ b/computepipe/runner/stream_manager/SemanticManager.cpp
@@ -83,7 +83,7 @@
     std::lock_guard<std::mutex> lock(mStateLock);
     if (mState == CONFIG_DONE && e.isPhaseEntry()) {
         mState = RUNNING;
-        return ILLEGAL_STATE;
+        return SUCCESS;
     }
     if (mState == RESET) {
         /* Cannot get to running phase from reset state without config phase*/