Adding a GrpcGraph that communicates with a remote graph through GRPC.

1) Added asynchronous GRPC client that communicates with the service
through completion queues
2) Added a stream observer that observes output streams and relays the
data back to the engine. The stream observer also reports graph stoppage
back to the graph by notifying it when all output streams are closed.
3) Added a test with a sample synchronous service to test out the core
of the algorithm implemented.

Bug: 145227010
Test: Added a unit test
Change-Id: I88667558865f66cb2fcb08a2c78008ab58cda603
diff --git a/computepipe/example/Runner.cpp b/computepipe/example/Runner.cpp
index b46345e..bbea0b1 100644
--- a/computepipe/example/Runner.cpp
+++ b/computepipe/example/Runner.cpp
@@ -52,10 +52,11 @@
 
 int main(int /* argc */, char** /* argv */) {
     std::shared_ptr<RunnerEngine> engine =
-        sEngineFactory.createRunnerEngine(RunnerEngineFactory::kDefault, "");
+            sEngineFactory.createRunnerEngine(RunnerEngineFactory::kDefault, "");
 
     std::unique_ptr<PrebuiltGraph> graph;
-    graph.reset(PrebuiltGraph::GetPrebuiltGraphFromLibrary("libfacegraph.so", engine));
+    graph.reset(android::automotive::computepipe::graph::GetLocalGraphFromLibrary("libfacegraph.so",
+                                                                                  engine));
 
     Options options = graph->GetSupportedGraphConfigs();
     engine->setPrebuiltGraph(std::move(graph));
diff --git a/computepipe/proto/Android.bp b/computepipe/proto/Android.bp
index 68eab52..27087b7 100644
--- a/computepipe/proto/Android.bp
+++ b/computepipe/proto/Android.bp
@@ -44,9 +44,9 @@
             static_libs: [
                 "libprotobuf-cpp-lite",
             ],
-            shared: {
-                enabled: false,
-            },
+            shared_libs: [
+                "liblog",
+            ],
         },
     },
 }
diff --git a/computepipe/runner/engine/DefaultEngine.cpp b/computepipe/runner/engine/DefaultEngine.cpp
index 9b682ac..9f1531b 100644
--- a/computepipe/runner/engine/DefaultEngine.cpp
+++ b/computepipe/runner/engine/DefaultEngine.cpp
@@ -57,6 +57,9 @@
 void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
     mGraph = std::move(graph);
     mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
+    if (mGraph->GetGraphType() == graph::PrebuiltGraphType::REMOTE) {
+        mIgnoreInputManager = true;
+    }
 }
 
 Status DefaultEngine::setArgs(std::string engine_args) {
diff --git a/computepipe/runner/graph/Android.bp b/computepipe/runner/graph/Android.bp
index 589c00f..614eabc 100644
--- a/computepipe/runner/graph/Android.bp
+++ b/computepipe/runner/graph/Android.bp
@@ -25,7 +25,41 @@
     export_include_dirs: ["include"],
     static_libs: [
         "computepipe_runner_component",
+    ],
+
+    header_libs: ["computepipe_runner_includes"],
+    include_dirs: [
+          "packages/services/Car/computepipe",
+          "packages/services/Car/computepipe/runner/graph",
+    ],
+
+    shared_libs: [
         "libcomputepipeprotos",
+        "libbase",
+        "libdl",
+        "liblog",
+        "libutils",
+    ],
+
+    srcs: [
+        "LocalPrebuiltGraph.cpp",
+    ],
+}
+
+cc_library {
+    name: "computepipe_grpc_graph",
+
+    cflags: [
+        "-Wall",
+        "-Werror",
+        "-Wextra",
+        "-Wno-unused-parameter",
+    ],
+
+    export_include_dirs: ["include"],
+    static_libs: [
+        "computepipe_runner_component",
+        "computepipe_grpc_graph_proto",
     ],
 
     header_libs: ["computepipe_runner_includes"],
@@ -36,13 +70,16 @@
 
     shared_libs: [
         "libbase",
+        "libcomputepipeprotos",
         "libdl",
+        "libgrpc++",
         "liblog",
         "libutils",
-        "libprotobuf-cpp-lite",
+        "libprotobuf-cpp-full",
     ],
 
     srcs: [
-        "PrebuiltGraph.cpp",
+        "GrpcGraph.cpp",
+        "StreamSetObserver.cpp",
     ],
 }
diff --git a/computepipe/runner/graph/GrpcGraph.cpp b/computepipe/runner/graph/GrpcGraph.cpp
new file mode 100644
index 0000000..6967230
--- /dev/null
+++ b/computepipe/runner/graph/GrpcGraph.cpp
@@ -0,0 +1,464 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "GrpcGraph.h"
+
+#include <cstdlib>
+
+#include <android-base/logging.h>
+#include <grpcpp/grpcpp.h>
+
+#include "ClientConfig.pb.h"
+#include "GrpcGraph.h"
+#include "InputFrame.h"
+#include "RunnerComponent.h"
+#include "prebuilt_interface.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+namespace {
+constexpr int64_t kRpcDeadlineMilliseconds = 100;
+
+template <class ResponseType, class RpcType>
+std::pair<Status, std::string> FinishRpcAndGetResult(
+        ::grpc::ClientAsyncResponseReader<RpcType>* rpc, ::grpc::CompletionQueue* cq,
+        ResponseType* response) {
+    int random_tag = rand();
+    ::grpc::Status grpcStatus;
+    rpc->Finish(response, &grpcStatus, reinterpret_cast<void*>(random_tag));
+    bool ok = false;
+    void* got_tag;
+    if (!cq->Next(&got_tag, &ok)) {
+        LOG(ERROR) << "Unexpected shutdown of the completion queue";
+        return std::pair(Status::FATAL_ERROR, "Unexpected shutdown of the completion queue");
+    }
+
+    if (!ok) {
+        LOG(ERROR) << "Unable to complete RPC request";
+        return std::pair(Status::FATAL_ERROR, "Unable to complete RPC request");
+    }
+
+    CHECK_EQ(got_tag, reinterpret_cast<void*>(random_tag));
+    if (!grpcStatus.ok()) {
+        std::string error_message =
+                std::string("Grpc failed with error: ") + grpcStatus.error_message();
+        LOG(ERROR) << error_message;
+        return std::pair(Status::FATAL_ERROR, std::move(error_message));
+    }
+
+    return std::pair(Status::SUCCESS, std::string(""));
+}
+
+}  // namespace
+
+PrebuiltGraphState GrpcGraph::GetGraphState() const {
+    std::lock_guard lock(mLock);
+    return mGraphState;
+}
+
+Status GrpcGraph::GetStatus() const {
+    std::lock_guard lock(mLock);
+    return mStatus;
+}
+
+std::string GrpcGraph::GetErrorMessage() const {
+    std::lock_guard lock(mLock);
+    return mErrorMessage;
+}
+
+Status GrpcGraph::initialize(const std::string& address,
+                             std::weak_ptr<PrebuiltEngineInterface> engineInterface) {
+    std::shared_ptr<::grpc::ChannelCredentials> creds = ::grpc::InsecureChannelCredentials();
+    std::shared_ptr<::grpc::Channel> channel = ::grpc::CreateChannel(address, creds);
+    mGraphStub = proto::GrpcGraphService::NewStub(channel);
+    mEngineInterface = engineInterface;
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+    ::grpc::CompletionQueue cq;
+
+    proto::GraphOptionsRequest getGraphOptionsRequest;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::GraphOptionsResponse>> rpc(
+            mGraphStub->AsyncGetGraphOptions(&context, getGraphOptionsRequest, &cq));
+
+    proto::GraphOptionsResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to get graph options: " << mErrorMessage;
+        return Status::FATAL_ERROR;
+    }
+
+    std::string serialized_options = response.serialized_options();
+    if (!mGraphConfig.ParseFromString(serialized_options)) {
+        mErrorMessage = "Failed to parse graph options";
+        LOG(ERROR) << "Failed to parse graph options";
+        return Status::FATAL_ERROR;
+    }
+
+    mGraphState = PrebuiltGraphState::STOPPED;
+    return Status::SUCCESS;
+}
+
+// Function to confirm that there would be no further changes to the graph configuration. This
+// needs to be called before starting the graph.
+Status GrpcGraph::handleConfigPhase(const runner::ClientConfig& e) {
+    std::lock_guard lock(mLock);
+    if (mGraphState == PrebuiltGraphState::UNINITIALIZED) {
+        mStatus = Status::ILLEGAL_STATE;
+        return Status::ILLEGAL_STATE;
+    }
+
+    // handleConfigPhase is a blocking call, so abort call is pointless for this RunnerEvent.
+    if (e.isAborted()) {
+        mStatus = Status::INVALID_ARGUMENT;
+        return mStatus;
+    } else if (e.isTransitionComplete()) {
+        mStatus = Status::SUCCESS;
+        return mStatus;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+    ::grpc::CompletionQueue cq;
+
+    std::string serializedConfig = e.getSerializedClientConfig();
+    proto::SetGraphConfigRequest setGraphConfigRequest;
+    setGraphConfigRequest.set_serialized_config(std::move(serializedConfig));
+
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncSetGraphConfig(&context, setGraphConfigRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Rpc failed while trying to set configuration";
+        return mStatus;
+    }
+
+    if (response.code() != proto::RemoteGraphStatusCode::SUCCESS) {
+        LOG(ERROR) << "Failed to cofngure remote graph. " << response.message();
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    mStreamSetObserver = std::make_unique<StreamSetObserver>(e, this);
+
+    return mStatus;
+}
+
+// Starts the graph.
+Status GrpcGraph::handleExecutionPhase(const runner::RunnerEvent& e) {
+    std::lock_guard lock(mLock);
+    if (mGraphState != PrebuiltGraphState::STOPPED) {
+        mStatus = Status::ILLEGAL_STATE;
+        return mStatus;
+    }
+
+    if (e.isAborted()) {
+        // Starting the graph is a blocking call and cannot be aborted in between.
+        mStatus = Status::INVALID_ARGUMENT;
+        return mStatus;
+    } else if (e.isTransitionComplete()) {
+        mStatus = Status::SUCCESS;
+        return mStatus;
+    }
+
+    // Start observing the output streams
+    mStatus = mStreamSetObserver->startObservingStreams();
+    if (mStatus != Status::SUCCESS) {
+        mErrorMessage = "Failed to observe output streams";
+        return mStatus;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::StartGraphExecutionRequest startExecutionRequest;
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncStartGraphExecution(&context, startExecutionRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to start graph execution";
+        return mStatus;
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    if (mStatus == Status::SUCCESS) {
+        mGraphState = PrebuiltGraphState::RUNNING;
+    }
+
+    return mStatus;
+}
+
+// Stops the graph while letting the graph flush output packets in flight.
+Status GrpcGraph::handleStopWithFlushPhase(const runner::RunnerEvent& e) {
+    std::lock_guard lock(mLock);
+    if (mGraphState != PrebuiltGraphState::RUNNING) {
+        return Status::ILLEGAL_STATE;
+    }
+
+    if (e.isAborted()) {
+        return Status::INVALID_ARGUMENT;
+    } else if (e.isTransitionComplete()) {
+        return Status::SUCCESS;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::StopGraphExecutionRequest stopExecutionRequest;
+    stopExecutionRequest.set_stop_immediate(false);
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncStopGraphExecution(&context, stopExecutionRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to stop graph execution";
+        return Status::FATAL_ERROR;
+    }
+
+    // Stop observing streams immendiately.
+    mStreamSetObserver->stopObservingStreams(false);
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    if (mStatus == Status::SUCCESS) {
+        mGraphState = PrebuiltGraphState::FLUSHING;
+    }
+
+    return mStatus;
+}
+
+// Stops the graph and cancels all the output packets.
+Status GrpcGraph::handleStopImmediatePhase(const runner::RunnerEvent& e) {
+    std::lock_guard lock(mLock);
+    if (mGraphState != PrebuiltGraphState::RUNNING) {
+        return Status::ILLEGAL_STATE;
+    }
+
+    if (e.isAborted()) {
+        return Status::INVALID_ARGUMENT;
+    } else if (e.isTransitionComplete()) {
+        return Status::SUCCESS;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::StopGraphExecutionRequest stopExecutionRequest;
+    stopExecutionRequest.set_stop_immediate(true);
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncStopGraphExecution(&context, stopExecutionRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to stop graph execution";
+        return Status::FATAL_ERROR;
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    // Stop observing streams immendiately.
+    mStreamSetObserver->stopObservingStreams(true);
+
+    if (mStatus == Status::SUCCESS) {
+        mGraphState = PrebuiltGraphState::STOPPED;
+    }
+    return mStatus;
+}
+
+Status GrpcGraph::handleResetPhase(const runner::RunnerEvent& e) {
+    std::lock_guard lock(mLock);
+    if (mGraphState != PrebuiltGraphState::STOPPED) {
+        return Status::ILLEGAL_STATE;
+    }
+
+    if (e.isAborted()) {
+        return Status::INVALID_ARGUMENT;
+    } else if (e.isTransitionComplete()) {
+        return Status::SUCCESS;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::ResetGraphRequest resetGraphRequest;
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncResetGraph(&context, resetGraphRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to stop graph execution";
+        return Status::FATAL_ERROR;
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+    mStreamSetObserver.reset();
+
+    return mStatus;
+}
+
+Status GrpcGraph::SetInputStreamData(int /*streamIndex*/, int64_t /*timestamp*/,
+                                     const std::string& /*streamData*/) {
+    LOG(ERROR) << "Cannot set input stream for remote graphs";
+    return Status::FATAL_ERROR;
+}
+
+Status GrpcGraph::SetInputStreamPixelData(int /*streamIndex*/, int64_t /*timestamp*/,
+                                          const runner::InputFrame& /*inputFrame*/) {
+    LOG(ERROR) << "Cannot set input streams for remote graphs";
+    return Status::FATAL_ERROR;
+}
+
+Status GrpcGraph::StartGraphProfiling() {
+    std::lock_guard lock(mLock);
+    if (mGraphState != PrebuiltGraphState::RUNNING) {
+        return Status::ILLEGAL_STATE;
+    }
+
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::StartGraphProfilingRequest startProfilingRequest;
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncStartGraphProfiling(&context, startProfilingRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to start graph profiling";
+        return Status::FATAL_ERROR;
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    return mStatus;
+}
+
+Status GrpcGraph::StopGraphProfiling() {
+    // Stopping profiling after graph has already stopped can be a no-op
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::StopGraphProfilingRequest stopProfilingRequest;
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc(
+            mGraphStub->AsyncStopGraphProfiling(&context, stopProfilingRequest, &cq));
+
+    proto::StatusResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to stop graph profiling";
+        return Status::FATAL_ERROR;
+    }
+
+    mStatus = static_cast<Status>(static_cast<int>(response.code()));
+    mErrorMessage = response.message();
+
+    return mStatus;
+}
+
+std::string GrpcGraph::GetDebugInfo() {
+    ::grpc::ClientContext context;
+    context.set_deadline(std::chrono::system_clock::now() +
+                         std::chrono::milliseconds(kRpcDeadlineMilliseconds));
+
+    proto::ProfilingDataRequest profilingDataRequest;
+    ::grpc::CompletionQueue cq;
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::ProfilingDataResponse>> rpc(
+            mGraphStub->AsyncGetProfilingData(&context, profilingDataRequest, &cq));
+
+    proto::ProfilingDataResponse response;
+    auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response);
+    if (mStatus != Status::SUCCESS) {
+        LOG(ERROR) << "Failed to get profiling info";
+        return "";
+    }
+
+    return response.data();
+}
+
+void GrpcGraph::dispatchPixelData(int streamId, int64_t timestamp_us,
+                                  const runner::InputFrame& frame) {
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock();
+    if (engineInterface) {
+        return engineInterface->DispatchPixelData(streamId, timestamp_us, frame);
+    }
+}
+
+void GrpcGraph::dispatchSerializedData(int streamId, int64_t timestamp_us,
+                                       std::string&& serialized_data) {
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock();
+    if (engineInterface) {
+        return engineInterface->DispatchSerializedData(streamId, timestamp_us,
+                                                       std::move(serialized_data));
+    }
+}
+
+void GrpcGraph::dispatchGraphTerminationMessage(Status status, std::string&& errorMessage) {
+    std::lock_guard lock(mLock);
+    mErrorMessage = std::move(errorMessage);
+    mStatus = status;
+    mGraphState = PrebuiltGraphState::STOPPED;
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock();
+    if (engineInterface) {
+        std::string errorMessageTmp = mErrorMessage;
+        engineInterface->DispatchGraphTerminationMessage(mStatus, std::move(errorMessageTmp));
+    }
+}
+
+std::unique_ptr<PrebuiltGraph> GetRemoteGraphFromAddress(
+        const std::string& address, std::weak_ptr<PrebuiltEngineInterface> engineInterface) {
+    auto prebuiltGraph = std::make_unique<GrpcGraph>();
+    Status status = prebuiltGraph->initialize(address, engineInterface);
+    if (status != Status::SUCCESS) {
+        return nullptr;
+    }
+
+    return prebuiltGraph;
+}
+
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
diff --git a/computepipe/runner/graph/GrpcGraph.h b/computepipe/runner/graph/GrpcGraph.h
new file mode 100644
index 0000000..d833dd2
--- /dev/null
+++ b/computepipe/runner/graph/GrpcGraph.h
@@ -0,0 +1,134 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_GRAPH_INCLUDE_LOCALPREBUILTGRAPH_H
+#define COMPUTEPIPE_RUNNER_GRAPH_INCLUDE_LOCALPREBUILTGRAPH_H
+
+#include <condition_variable>
+#include <functional>
+#include <map>
+#include <memory>
+#include <shared_mutex>
+#include <string>
+#include <thread>
+
+#include "ClientConfig.pb.h"
+#include "GrpcPrebuiltGraphService.grpc.pb.h"
+#include "GrpcPrebuiltGraphService.pb.h"
+#include "InputFrame.h"
+#include "Options.pb.h"
+#include "OutputConfig.pb.h"
+#include "PrebuiltEngineInterface.h"
+#include "PrebuiltGraph.h"
+#include "RunnerComponent.h"
+#include "StreamSetObserver.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+
+class GrpcGraph : public PrebuiltGraph, public StreamGraphInterface {
+
+  public:
+    GrpcGraph() {}
+
+    virtual ~GrpcGraph() {}
+
+    Status initialize(const std::string& address,
+                      std::weak_ptr<PrebuiltEngineInterface> engineInterface);
+
+    // No copy or move constructors or operators are available.
+    GrpcGraph(const GrpcGraph&) = delete;
+    GrpcGraph& operator=(const GrpcGraph&) = delete;
+
+    // Override RunnerComponent interface functions for applying configs,
+    // starting the graph and stopping the graph.
+    Status handleConfigPhase(const runner::ClientConfig& e) override;
+    Status handleExecutionPhase(const runner::RunnerEvent& e) override;
+    Status handleStopWithFlushPhase(const runner::RunnerEvent& e) override;
+    Status handleStopImmediatePhase(const runner::RunnerEvent& e) override;
+    Status handleResetPhase(const runner::RunnerEvent& e) override;
+
+    PrebuiltGraphType GetGraphType() const override {
+        return PrebuiltGraphType::REMOTE;
+    }
+
+    PrebuiltGraphState GetGraphState() const override;
+    Status GetStatus() const override;
+    std::string GetErrorMessage() const override;
+
+    // Gets the supported graph config options.
+    const proto::Options& GetSupportedGraphConfigs() const override {
+        return mGraphConfig;
+    }
+
+    // Sets input stream data. The string is expected to be a serialized proto
+    // the definition of which is known to the graph.
+    Status SetInputStreamData(int streamIndex, int64_t timestamp,
+                              const std::string& streamData) override;
+
+    // Sets pixel data to the specified input stream index.
+    Status SetInputStreamPixelData(int streamIndex, int64_t timestamp,
+                                   const runner::InputFrame& inputFrame) override;
+
+    // Starts graph profiling at some point after starting the graph with profiling enabled.
+    Status StartGraphProfiling() override;
+
+    // Stops graph profiling.
+    Status StopGraphProfiling() override;
+
+    // Collects debugging and profiling information for the graph. The graph
+    // needs to be started with debugging enabled in order to get valid info.
+    std::string GetDebugInfo() override;
+
+    // Stream Graph interface
+    proto::GrpcGraphService::Stub* getServiceStub() override {
+        return mGraphStub.get();
+    }
+
+    void dispatchPixelData(int streamId, int64_t timestamp_us,
+                           const runner::InputFrame& frame) override;
+
+    void dispatchSerializedData(int streamId, int64_t timestamp_us,
+                                std::string&& serialized_data) override;
+
+    void dispatchGraphTerminationMessage(Status, std::string&&) override;
+  private:
+    mutable std::mutex mLock;
+
+    PrebuiltGraphState mGraphState = PrebuiltGraphState::UNINITIALIZED;
+
+    Status mStatus = Status::SUCCESS;
+
+    std::string mErrorMessage = "";
+
+    // Cached callback interface that is passed in from the runner.
+    std::weak_ptr<PrebuiltEngineInterface> mEngineInterface;
+
+    // Cached graph config.
+    proto::Options mGraphConfig;
+
+    std::unique_ptr<proto::GrpcGraphService::Stub> mGraphStub;
+
+    std::unique_ptr<StreamSetObserver> mStreamSetObserver;
+};
+
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
+
+#endif  // #define COMPUTEPIPE_RUNNER_GRAPH_INCLUDE_LOCALPREBUILTGRAPH_H
diff --git a/computepipe/runner/graph/PrebuiltGraph.cpp b/computepipe/runner/graph/LocalPrebuiltGraph.cpp
similarity index 62%
rename from computepipe/runner/graph/PrebuiltGraph.cpp
rename to computepipe/runner/graph/LocalPrebuiltGraph.cpp
index 2705d01..ae4cc8c 100644
--- a/computepipe/runner/graph/PrebuiltGraph.cpp
+++ b/computepipe/runner/graph/LocalPrebuiltGraph.cpp
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "PrebuiltGraph.h"
+#include "LocalPrebuiltGraph.h"
 
 #include <android-base/logging.h>
 #include <dlfcn.h>
@@ -26,6 +26,7 @@
 
 #include "ClientConfig.pb.h"
 #include "InputFrame.h"
+#include "PrebuiltGraph.h"
 #include "RunnerComponent.h"
 #include "prebuilt_interface.h"
 #include "types/Status.h"
@@ -39,19 +40,19 @@
     {                                                                              \
         std::string func_name = std::string("PrebuiltComputepipeRunner_") + #name; \
         mPrebuiltGraphInstance->mFn##name =                                        \
-            dlsym(mPrebuiltGraphInstance->mHandle, func_name.c_str());             \
+                dlsym(mPrebuiltGraphInstance->mHandle, func_name.c_str());         \
         if (mPrebuiltGraphInstance->mFn##name == nullptr) {                        \
             initialized = false;                                                   \
             LOG(ERROR) << std::string(dlerror()) << std::endl;                     \
         }                                                                          \
     }
 
-std::mutex PrebuiltGraph::mCreationMutex;
-PrebuiltGraph* PrebuiltGraph::mPrebuiltGraphInstance = nullptr;
+std::mutex LocalPrebuiltGraph::mCreationMutex;
+LocalPrebuiltGraph* LocalPrebuiltGraph::mPrebuiltGraphInstance = nullptr;
 
 // Function to confirm that there would be no further changes to the graph configuration. This
 // needs to be called before starting the graph.
-Status PrebuiltGraph::handleConfigPhase(const runner::ClientConfig& e) {
+Status LocalPrebuiltGraph::handleConfigPhase(const runner::ClientConfig& e) {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return Status::ILLEGAL_STATE;
     }
@@ -64,39 +65,41 @@
     }
 
     std::string config = e.getSerializedClientConfig();
-    auto mappedFn =
-        (PrebuiltComputepipeRunner_ErrorCode(*)(const unsigned char*, size_t))mFnUpdateGraphConfig;
+    auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(const unsigned char*,
+                                                            size_t))mFnUpdateGraphConfig;
     PrebuiltComputepipeRunner_ErrorCode errorCode =
-        mappedFn(reinterpret_cast<const unsigned char*>(config.c_str()), config.length());
+            mappedFn(reinterpret_cast<const unsigned char*>(config.c_str()), config.length());
     if (errorCode != PrebuiltComputepipeRunner_ErrorCode::SUCCESS) {
         return static_cast<Status>(static_cast<int>(errorCode));
     }
 
     // Set the pixel stream callback function. The same function will be called for all requested
     // pixel output streams.
-    if (mEngineInterface) {
+    if (mEngineInterface.lock() != nullptr) {
         auto pixelCallbackFn = (PrebuiltComputepipeRunner_ErrorCode(*)(
-            void (*)(void* cookie, int, int64_t, const uint8_t* pixels, int width, int height,
-                     int step, int format)))mFnSetOutputPixelStreamCallback;
+                void (*)(void* cookie, int, int64_t, const uint8_t* pixels, int width, int height,
+                         int step, int format)))mFnSetOutputPixelStreamCallback;
         PrebuiltComputepipeRunner_ErrorCode errorCode =
-            pixelCallbackFn(PrebuiltGraph::OutputPixelStreamCallbackFunction);
+                pixelCallbackFn(LocalPrebuiltGraph::OutputPixelStreamCallbackFunction);
         if (errorCode != PrebuiltComputepipeRunner_ErrorCode::SUCCESS) {
             return static_cast<Status>(static_cast<int>(errorCode));
         }
 
         // Set the serialized stream callback function. The same callback function will be invoked
         // for all requested serialized output streams.
-        auto streamCallbackFn = (PrebuiltComputepipeRunner_ErrorCode(*)(void (*)(
-            void* cookie, int, int64_t, const unsigned char*, size_t)))mFnSetOutputStreamCallback;
-        errorCode = streamCallbackFn(PrebuiltGraph::OutputStreamCallbackFunction);
+        auto streamCallbackFn = (PrebuiltComputepipeRunner_ErrorCode(*)(
+                void (*)(void* cookie, int, int64_t, const unsigned char*,
+                         size_t)))mFnSetOutputStreamCallback;
+        errorCode = streamCallbackFn(LocalPrebuiltGraph::OutputStreamCallbackFunction);
         if (errorCode != PrebuiltComputepipeRunner_ErrorCode::SUCCESS) {
             return static_cast<Status>(static_cast<int>(errorCode));
         }
 
         // Set the callback function for when the graph terminates.
         auto terminationCallback = (PrebuiltComputepipeRunner_ErrorCode(*)(
-            void (*)(void* cookie, const unsigned char*, size_t)))mFnSetGraphTerminationCallback;
-        errorCode = terminationCallback(PrebuiltGraph::GraphTerminationCallbackFunction);
+                void (*)(void* cookie, const unsigned char*,
+                         size_t)))mFnSetGraphTerminationCallback;
+        errorCode = terminationCallback(LocalPrebuiltGraph::GraphTerminationCallbackFunction);
         if (errorCode != PrebuiltComputepipeRunner_ErrorCode::SUCCESS) {
             return static_cast<Status>(static_cast<int>(errorCode));
         }
@@ -106,7 +109,7 @@
 }
 
 // Starts the graph.
-Status PrebuiltGraph::handleExecutionPhase(const runner::RunnerEvent& e) {
+Status LocalPrebuiltGraph::handleExecutionPhase(const runner::RunnerEvent& e) {
     if (mGraphState.load() != PrebuiltGraphState::STOPPED) {
         return Status::ILLEGAL_STATE;
     }
@@ -127,7 +130,7 @@
 }
 
 // Stops the graph while letting the graph flush output packets in flight.
-Status PrebuiltGraph::handleStopWithFlushPhase(const runner::RunnerEvent& e) {
+Status LocalPrebuiltGraph::handleStopWithFlushPhase(const runner::RunnerEvent& e) {
     if (mGraphState.load() != PrebuiltGraphState::RUNNING) {
         return Status::ILLEGAL_STATE;
     }
@@ -142,7 +145,7 @@
 }
 
 // Stops the graph and cancels all the output packets.
-Status PrebuiltGraph::handleStopImmediatePhase(const runner::RunnerEvent& e) {
+Status LocalPrebuiltGraph::handleStopImmediatePhase(const runner::RunnerEvent& e) {
     if (mGraphState.load() != PrebuiltGraphState::RUNNING) {
         return Status::ILLEGAL_STATE;
     }
@@ -156,7 +159,7 @@
     return StopGraphExecution(/* flushOutputFrames = */ false);
 }
 
-Status PrebuiltGraph::handleResetPhase(const runner::RunnerEvent& e) {
+Status LocalPrebuiltGraph::handleResetPhase(const runner::RunnerEvent& e) {
     if (mGraphState.load() != PrebuiltGraphState::STOPPED) {
         return Status::ILLEGAL_STATE;
     }
@@ -172,11 +175,12 @@
     return Status::SUCCESS;
 }
 
-PrebuiltGraph* PrebuiltGraph::GetPrebuiltGraphFromLibrary(
-    const std::string& prebuilt_library, std::shared_ptr<PrebuiltEngineInterface> engineInterface) {
-    std::unique_lock<std::mutex> lock(PrebuiltGraph::mCreationMutex);
+LocalPrebuiltGraph* LocalPrebuiltGraph::GetPrebuiltGraphFromLibrary(
+        const std::string& prebuilt_library,
+        std::weak_ptr<PrebuiltEngineInterface> engineInterface) {
+    std::unique_lock<std::mutex> lock(LocalPrebuiltGraph::mCreationMutex);
     if (mPrebuiltGraphInstance != nullptr) {
-        mPrebuiltGraphInstance = new PrebuiltGraph();
+        mPrebuiltGraphInstance = new LocalPrebuiltGraph();
     }
     if (mPrebuiltGraphInstance->mGraphState.load() != PrebuiltGraphState::UNINITIALIZED) {
         return mPrebuiltGraphInstance;
@@ -187,18 +191,20 @@
         bool initialized = true;
 
         // Load config and version number first.
-        const unsigned char* (*getVersionFn)() = (const unsigned char* (*)())dlsym(
-            mPrebuiltGraphInstance->mHandle, "PrebuiltComputepipeRunner_GetVersion");
+        const unsigned char* (*getVersionFn)() =
+                (const unsigned char* (*)())dlsym(mPrebuiltGraphInstance->mHandle,
+                                                  "PrebuiltComputepipeRunner_GetVersion");
         if (getVersionFn != nullptr) {
             mPrebuiltGraphInstance->mGraphVersion =
-                std::string(reinterpret_cast<const char*>(getVersionFn()));
+                    std::string(reinterpret_cast<const char*>(getVersionFn()));
         } else {
             LOG(ERROR) << std::string(dlerror());
             initialized = false;
         }
 
-        void (*getSupportedGraphConfigsFn)(const void**, size_t*) = (void (*)(
-            const void**, size_t*))dlsym(mPrebuiltGraphInstance->mHandle,
+        void (*getSupportedGraphConfigsFn)(const void**, size_t*) =
+                (void (*)(const void**,
+                          size_t*))dlsym(mPrebuiltGraphInstance->mHandle,
                                          "PrebuiltComputepipeRunner_GetSupportedGraphConfigs");
         if (getSupportedGraphConfigsFn != nullptr) {
             size_t graphConfigSize;
@@ -208,7 +214,7 @@
 
             if (graphConfigSize > 0) {
                 initialized &= mPrebuiltGraphInstance->mGraphConfig.ParseFromString(
-                    std::string(reinterpret_cast<const char*>(graphConfig), graphConfigSize));
+                        std::string(reinterpret_cast<const char*>(graphConfig), graphConfigSize));
             }
         } else {
             LOG(ERROR) << std::string(dlerror());
@@ -216,7 +222,7 @@
         }
 
         // Null callback interface is not acceptable.
-        if (initialized && engineInterface == nullptr) {
+        if (initialized && engineInterface.lock() == nullptr) {
             initialized = false;
         }
 
@@ -247,13 +253,13 @@
     return mPrebuiltGraphInstance;
 }
 
-PrebuiltGraph::~PrebuiltGraph() {
+LocalPrebuiltGraph::~LocalPrebuiltGraph() {
     if (mHandle) {
         dlclose(mHandle);
     }
 }
 
-Status PrebuiltGraph::GetStatus() const {
+Status LocalPrebuiltGraph::GetStatus() const {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return Status::ILLEGAL_STATE;
     }
@@ -263,12 +269,12 @@
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-std::string PrebuiltGraph::GetErrorMessage() const {
+std::string LocalPrebuiltGraph::GetErrorMessage() const {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return "Graph has not been initialized";
     }
-    auto mappedFn =
-        (PrebuiltComputepipeRunner_ErrorCode(*)(unsigned char*, size_t, size_t*))mFnGetErrorMessage;
+    auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(unsigned char*, size_t,
+                                                            size_t*))mFnGetErrorMessage;
     size_t errorMessageSize = 0;
 
     PrebuiltComputepipeRunner_ErrorCode errorCode = mappedFn(nullptr, 0, &errorMessageSize);
@@ -283,37 +289,40 @@
                        reinterpret_cast<char*>(&errorMessage[0]) + errorMessage.size());
 }
 
-Status PrebuiltGraph::SetInputStreamData(int streamIndex, int64_t timestamp,
-                                         const std::string& streamData) {
+Status LocalPrebuiltGraph::SetInputStreamData(int streamIndex, int64_t timestamp,
+                                              const std::string& streamData) {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return Status::ILLEGAL_STATE;
     }
     auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(int, int64_t, const unsigned char*,
                                                             size_t))mFnSetInputStreamData;
     PrebuiltComputepipeRunner_ErrorCode errorCode =
-        mappedFn(streamIndex, timestamp, reinterpret_cast<const unsigned char*>(streamData.c_str()),
-                 streamData.length());
+            mappedFn(streamIndex, timestamp,
+                     reinterpret_cast<const unsigned char*>(streamData.c_str()),
+                     streamData.length());
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-Status PrebuiltGraph::SetInputStreamPixelData(int streamIndex, int64_t timestamp,
-                                              const runner::InputFrame& inputFrame) {
+Status LocalPrebuiltGraph::SetInputStreamPixelData(int streamIndex, int64_t timestamp,
+                                                   const runner::InputFrame& inputFrame) {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return Status::ILLEGAL_STATE;
     }
 
-    auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(
-        int, int64_t, const uint8_t*, int, int, int,
-        PrebuiltComputepipeRunner_PixelDataFormat))mFnSetInputStreamPixelData;
+    auto mappedFn =
+            (PrebuiltComputepipeRunner_ErrorCode(*)(int, int64_t, const uint8_t*, int, int, int,
+                                                    PrebuiltComputepipeRunner_PixelDataFormat))
+                    mFnSetInputStreamPixelData;
     PrebuiltComputepipeRunner_ErrorCode errorCode =
-        mappedFn(streamIndex, timestamp, inputFrame.getFramePtr(), inputFrame.getFrameInfo().width,
-                 inputFrame.getFrameInfo().height, inputFrame.getFrameInfo().stride,
-                 static_cast<PrebuiltComputepipeRunner_PixelDataFormat>(
-                     static_cast<int>(inputFrame.getFrameInfo().format)));
+            mappedFn(streamIndex, timestamp, inputFrame.getFramePtr(),
+                     inputFrame.getFrameInfo().width, inputFrame.getFrameInfo().height,
+                     inputFrame.getFrameInfo().stride,
+                     static_cast<PrebuiltComputepipeRunner_PixelDataFormat>(
+                             static_cast<int>(inputFrame.getFrameInfo().format)));
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-Status PrebuiltGraph::StopGraphExecution(bool flushOutputFrames) {
+Status LocalPrebuiltGraph::StopGraphExecution(bool flushOutputFrames) {
     auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(bool))mFnStopGraphExecution;
     PrebuiltComputepipeRunner_ErrorCode errorCode = mappedFn(flushOutputFrames);
     if (errorCode == PrebuiltComputepipeRunner_ErrorCode::SUCCESS) {
@@ -323,24 +332,24 @@
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-Status PrebuiltGraph::StartGraphProfiling() {
+Status LocalPrebuiltGraph::StartGraphProfiling() {
     auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)())mFnStartGraphProfiling;
     PrebuiltComputepipeRunner_ErrorCode errorCode = mappedFn();
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-Status PrebuiltGraph::StopGraphProfiling() {
+Status LocalPrebuiltGraph::StopGraphProfiling() {
     auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)())mFnStopGraphProfiling;
     PrebuiltComputepipeRunner_ErrorCode errorCode = mappedFn();
     return static_cast<Status>(static_cast<int>(errorCode));
 }
 
-std::string PrebuiltGraph::GetDebugInfo() {
+std::string LocalPrebuiltGraph::GetDebugInfo() {
     if (mGraphState.load() == PrebuiltGraphState::UNINITIALIZED) {
         return "";
     }
-    auto mappedFn =
-        (PrebuiltComputepipeRunner_ErrorCode(*)(unsigned char*, size_t, size_t*))mFnGetDebugInfo;
+    auto mappedFn = (PrebuiltComputepipeRunner_ErrorCode(*)(unsigned char*, size_t,
+                                                            size_t*))mFnGetDebugInfo;
 
     size_t debugInfoSize = 0;
     PrebuiltComputepipeRunner_ErrorCode errorCode = mappedFn(nullptr, 0, &debugInfoSize);
@@ -355,36 +364,53 @@
                        reinterpret_cast<char*>(&debugInfo[0]) + debugInfo.size());
 }
 
-void PrebuiltGraph::OutputStreamCallbackFunction(void* cookie, int streamIndex, int64_t timestamp,
-                                                 const unsigned char* data, size_t data_size) {
-    PrebuiltGraph* graph = reinterpret_cast<PrebuiltGraph*>(cookie);
+void LocalPrebuiltGraph::OutputStreamCallbackFunction(void* cookie, int streamIndex,
+                                                      int64_t timestamp, const unsigned char* data,
+                                                      size_t data_size) {
+    LocalPrebuiltGraph* graph = reinterpret_cast<LocalPrebuiltGraph*>(cookie);
     CHECK(graph);
-    graph->mEngineInterface->DispatchSerializedData(streamIndex, timestamp,
-                                                    std::string(data, data + data_size));
-}
-
-void PrebuiltGraph::OutputPixelStreamCallbackFunction(void* cookie, int streamIndex,
-                                                      int64_t timestamp, const uint8_t* pixels,
-                                                      int width, int height, int step, int format) {
-    PrebuiltGraph* graph = reinterpret_cast<PrebuiltGraph*>(cookie);
-    CHECK(graph);
-    runner::InputFrame frame(height, width, static_cast<PixelFormat>(format), step, pixels);
-
-    graph->mEngineInterface->DispatchPixelData(streamIndex, timestamp, frame);
-}
-
-void PrebuiltGraph::GraphTerminationCallbackFunction(void* cookie,
-                                                     const unsigned char* termination_message,
-                                                     size_t termination_message_size) {
-    PrebuiltGraph* graph = reinterpret_cast<PrebuiltGraph*>(cookie);
-    CHECK(graph);
-    std::string errorMessage = "";
-    if (termination_message != nullptr && termination_message_size > 0) {
-        std::string(termination_message, termination_message + termination_message_size);
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = graph->mEngineInterface.lock();
+    if (engineInterface != nullptr) {
+        engineInterface->DispatchSerializedData(streamIndex, timestamp,
+                                                std::string(data, data + data_size));
     }
-    graph->mGraphState.store(PrebuiltGraphState::STOPPED);
-    graph->mEngineInterface->DispatchGraphTerminationMessage(graph->GetStatus(),
-                                                             std::move(errorMessage));
+}
+
+void LocalPrebuiltGraph::OutputPixelStreamCallbackFunction(void* cookie, int streamIndex,
+                                                           int64_t timestamp, const uint8_t* pixels,
+                                                           int width, int height, int step,
+                                                           int format) {
+    LocalPrebuiltGraph* graph = reinterpret_cast<LocalPrebuiltGraph*>(cookie);
+    CHECK(graph);
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = graph->mEngineInterface.lock();
+
+    if (engineInterface) {
+        runner::InputFrame frame(height, width, static_cast<PixelFormat>(format), step, pixels);
+        engineInterface->DispatchPixelData(streamIndex, timestamp, frame);
+    }
+}
+
+void LocalPrebuiltGraph::GraphTerminationCallbackFunction(void* cookie,
+                                                          const unsigned char* termination_message,
+                                                          size_t termination_message_size) {
+    LocalPrebuiltGraph* graph = reinterpret_cast<LocalPrebuiltGraph*>(cookie);
+    CHECK(graph);
+    std::shared_ptr<PrebuiltEngineInterface> engineInterface = graph->mEngineInterface.lock();
+
+    if (engineInterface) {
+        std::string errorMessage = "";
+        if (termination_message != nullptr && termination_message_size > 0) {
+            std::string(termination_message, termination_message + termination_message_size);
+        }
+        graph->mGraphState.store(PrebuiltGraphState::STOPPED);
+        engineInterface->DispatchGraphTerminationMessage(graph->GetStatus(),
+                                                         std::move(errorMessage));
+    }
+}
+
+PrebuiltGraph* GetLocalGraphFromLibrary(const std::string& prebuilt_library,
+                                        std::weak_ptr<PrebuiltEngineInterface> engineInterface) {
+    return LocalPrebuiltGraph::GetPrebuiltGraphFromLibrary(prebuilt_library, engineInterface);
 }
 
 }  // namespace graph
diff --git a/computepipe/runner/graph/LocalPrebuiltGraph.h b/computepipe/runner/graph/LocalPrebuiltGraph.h
new file mode 100644
index 0000000..4a59037
--- /dev/null
+++ b/computepipe/runner/graph/LocalPrebuiltGraph.h
@@ -0,0 +1,155 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_GRAPH_GRPC_GRAPH_H
+#define COMPUTEPIPE_RUNNER_GRAPH_GRPC_GRAPH_H
+
+#include <functional>
+#include <shared_mutex>
+#include <string>
+
+#include "ClientConfig.pb.h"
+#include "InputFrame.h"
+#include "Options.pb.h"
+#include "PrebuiltEngineInterface.h"
+#include "PrebuiltGraph.h"
+#include "RunnerComponent.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+
+class LocalPrebuiltGraph : public PrebuiltGraph {
+
+  private:
+    // Private constructor
+    LocalPrebuiltGraph() {
+    }
+
+  public:
+    ~LocalPrebuiltGraph();
+
+    // No copy or move constructors or operators are available.
+    LocalPrebuiltGraph(const LocalPrebuiltGraph&) = delete;
+    LocalPrebuiltGraph& operator=(const LocalPrebuiltGraph&) = delete;
+
+    // Override RunnerComponent interface functions for applying configs,
+    // starting the graph and stopping the graph.
+    Status handleConfigPhase(const runner::ClientConfig& e) override;
+    Status handleExecutionPhase(const runner::RunnerEvent& e) override;
+    Status handleStopWithFlushPhase(const runner::RunnerEvent& e) override;
+    Status handleStopImmediatePhase(const runner::RunnerEvent& e) override;
+    Status handleResetPhase(const runner::RunnerEvent& e) override;
+
+    static LocalPrebuiltGraph* GetPrebuiltGraphFromLibrary(
+        const std::string& prebuiltLib, std::weak_ptr<PrebuiltEngineInterface> engineInterface);
+
+    PrebuiltGraphType GetGraphType() const override {
+        return PrebuiltGraphType::LOCAL;
+    }
+
+    PrebuiltGraphState GetGraphState() const override {
+        return mGraphState;
+    }
+
+    Status GetStatus() const override;
+
+    std::string GetErrorMessage() const override;
+
+    // Gets the supported graph config options.
+    const proto::Options& GetSupportedGraphConfigs() const override {
+        return mGraphConfig;
+    }
+
+    // Sets input stream data. The string is expected to be a serialized proto
+    // the definition of which is known to the graph.
+    Status SetInputStreamData(int streamIndex, int64_t timestamp,
+                              const std::string& streamData) override;
+
+    // Sets pixel data to the specified input stream index.
+    Status SetInputStreamPixelData(int streamIndex, int64_t timestamp,
+                                   const runner::InputFrame& inputFrame) override;
+
+    Status StartGraphProfiling() override;
+
+    Status StopGraphProfiling() override;
+
+    // Collects debugging and profiling information for the graph. The graph
+    // needs to be started with debugging enabled in order to get valid info.
+    std::string GetDebugInfo() override;
+
+  private:
+    // Starts the graph execution.
+    Status StartGraphExecution(bool debuggingEnabled);
+
+    // Stops the graph execution.
+    Status StopGraphExecution(bool flushOutputFrames);
+
+    // Callback functions. The class has a C++ function callback interface while it deals with pure
+    // C functions underneath that do not have object context. We need to have these static
+    // functions that need to be passed to the C interface.
+    static void OutputPixelStreamCallbackFunction(void* cookie, int streamIndex, int64_t timestamp,
+                                                  const uint8_t* pixels, int width, int height,
+                                                  int step, int format);
+    static void OutputStreamCallbackFunction(void* cookie, int streamIndex, int64_t timestamp,
+                                             const unsigned char* data, size_t dataSize);
+    static void GraphTerminationCallbackFunction(void* cookie,
+                                                 const unsigned char* terminationMessage,
+                                                 size_t terminationMessageSize);
+
+    // Cached callback interface that is passed in from the runner.
+    std::weak_ptr<PrebuiltEngineInterface> mEngineInterface;
+
+    static std::mutex mCreationMutex;
+    static LocalPrebuiltGraph* mPrebuiltGraphInstance;
+
+    // Even though mutexes are generally preferred over atomics, the only varialble in this class
+    // that changes after initialization is graph state and that is the only vairable that needs
+    // to be guarded. The prebuilt is internally assumed to be thread safe, so that concurrent
+    // calls into the library will automatically be handled in a thread safe manner by the it.
+    std::atomic<PrebuiltGraphState> mGraphState = PrebuiltGraphState::UNINITIALIZED;
+
+    // Dynamic library handle
+    void* mHandle;
+
+    // Repeated function calls need not be made to get the graph version and the config is this is
+    // constant through the operation of the graph. These values are just cached as strings.
+    std::string mGraphVersion;
+    proto::Options mGraphConfig;
+
+    // Cached functions from the dynamic library.
+    void* mFnGetErrorCode;
+    void* mFnGetErrorMessage;
+    void* mFnUpdateGraphConfig;
+    void* mFnResetGraph;
+    void* mFnSetInputStreamData;
+    void* mFnSetInputStreamPixelData;
+    void* mFnSetOutputStreamCallback;
+    void* mFnSetOutputPixelStreamCallback;
+    void* mFnSetGraphTerminationCallback;
+    void* mFnStartGraphExecution;
+    void* mFnStopGraphExecution;
+    void* mFnStartGraphProfiling;
+    void* mFnStopGraphProfiling;
+    void* mFnGetDebugInfo;
+};
+
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
+
+#endif  // #define COMPUTEPIPE_RUNNER_GRAPH_GRPC_GRAPH_H
diff --git a/computepipe/runner/graph/StreamSetObserver.cpp b/computepipe/runner/graph/StreamSetObserver.cpp
new file mode 100644
index 0000000..30e3b54
--- /dev/null
+++ b/computepipe/runner/graph/StreamSetObserver.cpp
@@ -0,0 +1,197 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "StreamSetObserver.h"
+
+#include <android-base/logging.h>
+#include <grpcpp/grpcpp.h>
+
+#include "ClientConfig.pb.h"
+#include "GrpcGraph.h"
+#include "InputFrame.h"
+#include "RunnerComponent.h"
+#include "prebuilt_interface.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+
+SingleStreamObserver::SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
+                                           StreamGraphInterface* streamGraphInterface) :
+      mStreamId(streamId),
+      mEndOfStreamReporter(endOfStreamReporter),
+      mStreamGraphInterface(streamGraphInterface) {}
+
+Status SingleStreamObserver::startObservingStream() {
+    {
+        std::lock_guard lock(mStopObservationLock);
+        mStopped = false;
+    }
+    mThread = std::thread([this]() {
+        proto::ObserveOutputStreamRequest observeStreamRequest;
+        observeStreamRequest.set_stream_id(mStreamId);
+        ::grpc::ClientContext context;
+        ::grpc::CompletionQueue cq;
+
+        void* tag;
+        bool cqStatus;
+
+        std::unique_ptr<::grpc::ClientAsyncReader<proto::OutputStreamResponse>> rpc(
+                mStreamGraphInterface->getServiceStub()
+                        ->AsyncObserveOutputStream(&context, observeStreamRequest, &cq, nullptr));
+
+        proto::OutputStreamResponse response;
+
+        cq.Next(&tag, &cqStatus);
+        while (cqStatus) {
+            // Dispatch data only stream is being observed.
+            rpc->Read(&response, nullptr);
+            {
+                std::lock_guard lock(mStopObservationLock);
+                if (mStopped || mStreamGraphInterface == nullptr) {
+                    LOG(INFO) << "Graph stopped. ";
+                    break;
+                }
+
+                // Since this is a separate thread, we need not worry about recursive locking
+                // and callback can be executed without creating a new thread.
+                if (response.has_pixel_data()) {
+                    proto::PixelData pixels = response.pixel_data();
+                    runner::InputFrame frame(pixels.height(), pixels.width(),
+                                             static_cast<PixelFormat>(
+                                                     static_cast<int>(pixels.format())),
+                                             pixels.step(),
+                                             reinterpret_cast<const unsigned char*>(
+                                                     pixels.data().c_str()));
+                    mStreamGraphInterface->dispatchPixelData(mStreamId, response.timestamp_us(),
+                                                             frame);
+                } else if (response.has_semantic_data()) {
+                    mStreamGraphInterface
+                            ->dispatchSerializedData(mStreamId, response.timestamp_us(),
+                                                     std::move(*response.mutable_semantic_data()));
+                }
+            }
+
+            cq.Next(&tag, &cqStatus);
+        }
+
+        ::grpc::Status grpcStatus;
+        rpc->Finish(&grpcStatus, nullptr);
+        if (!grpcStatus.ok()) {
+            LOG(ERROR) << "Failed RPC with message: " << grpcStatus.error_message();
+        }
+
+        cq.Shutdown();
+        if (mEndOfStreamReporter) {
+            std::lock_guard lock(mStopObservationLock);
+            mStopped = true;
+            std::thread t =
+                    std::thread([this]() { mEndOfStreamReporter->reportStreamClosed(mStreamId); });
+
+            t.detach();
+        }
+
+        proto::OutputStreamResponse streamResponse;
+    });
+
+    return Status::SUCCESS;
+}
+
+void SingleStreamObserver::stopObservingStream() {
+    std::lock_guard lock(mStopObservationLock);
+    mStopped = true;
+}
+
+SingleStreamObserver::~SingleStreamObserver() {
+    stopObservingStream();
+
+    if (mThread.joinable()) {
+        mThread.join();
+    }
+}
+
+StreamSetObserver::StreamSetObserver(const runner::ClientConfig& clientConfig,
+                                     StreamGraphInterface* streamGraphInterface) :
+      mClientConfig(clientConfig), mStreamGraphInterface(streamGraphInterface) {}
+
+Status StreamSetObserver::startObservingStreams() {
+    std::lock_guard lock(mLock);
+    std::map<int, int> outputConfigs = {};
+    mClientConfig.getOutputStreamConfigs(outputConfigs);
+
+    if (!mStopped || !mStreamObservers.empty()) {
+        LOG(ERROR) << "Already started observing streams. Duplicate call is not allowed";
+        return Status::ILLEGAL_STATE;
+    }
+
+    for (const auto& it : outputConfigs) {
+        auto streamObserver =
+                std::make_unique<SingleStreamObserver>(it.first, this, mStreamGraphInterface);
+        Status status = streamObserver->startObservingStream();
+        if (status != Status::SUCCESS) {
+            std::thread t([this]() { stopObservingStreams(true); });
+            t.detach();
+            return status;
+        }
+        mStreamObservers.emplace(std::make_pair(it.first, std::move(streamObserver)));
+    }
+
+    mStopped = mStreamObservers.empty();
+    return Status::SUCCESS;
+}
+
+void StreamSetObserver::stopObservingStreams(bool stopImmediately) {
+    std::unique_lock lock(mLock);
+    if (mStopped) {
+        // Separate thread is necessary here to avoid recursive locking.
+        std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
+            streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
+        });
+        t.detach();
+        return;
+    }
+
+    // Wait for the streams to close if we are not stopping immediately.
+    if (stopImmediately) {
+        for (auto& it : mStreamObservers) {
+            it.second->stopObservingStream();
+        }
+
+        mStoppedCv.wait(lock, [this]() -> bool { return mStopped; });
+    }
+}
+
+void StreamSetObserver::reportStreamClosed(int streamId) {
+    std::lock_guard lock(mLock);
+    auto streamObserver = mStreamObservers.find(streamId);
+    if (streamObserver == mStreamObservers.end()) {
+        return;
+    }
+    mStreamObservers.erase(streamObserver);
+    if (mStreamObservers.empty()) {
+        mStopped = true;
+        mStoppedCv.notify_one();
+        std::thread t([streamGraphInterface(mStreamGraphInterface)]() {
+            streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
+        });
+        t.detach();
+    }
+}
+
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
diff --git a/computepipe/runner/graph/StreamSetObserver.h b/computepipe/runner/graph/StreamSetObserver.h
new file mode 100644
index 0000000..7999f18
--- /dev/null
+++ b/computepipe/runner/graph/StreamSetObserver.h
@@ -0,0 +1,103 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
+#define COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "GrpcPrebuiltGraphService.grpc.pb.h"
+#include "GrpcPrebuiltGraphService.pb.h"
+#include "InputFrame.h"
+#include "RunnerComponent.h"
+#include "types/Status.h"
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+
+class GrpcGraph;
+
+class EndOfStreamReporter {
+  public:
+    virtual ~EndOfStreamReporter() = default;
+
+    virtual void reportStreamClosed(int streamId) = 0;
+};
+
+class StreamGraphInterface {
+  public:
+    virtual ~StreamGraphInterface() = default;
+
+    virtual void dispatchPixelData(int streamId, int64_t timestamp_us,
+                                   const runner::InputFrame& frame) = 0;
+
+    virtual void dispatchSerializedData(int streamId, int64_t timestamp_us,
+                                        std::string&& serialized_data) = 0;
+
+    virtual void dispatchGraphTerminationMessage(Status, std::string&&) = 0;
+
+    virtual proto::GrpcGraphService::Stub* getServiceStub() = 0;
+};
+
+class SingleStreamObserver {
+  public:
+    SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
+                         StreamGraphInterface* streamGraphInterface);
+
+    virtual ~SingleStreamObserver();
+
+    Status startObservingStream();
+
+    void stopObservingStream();
+  private:
+    int mStreamId;
+    EndOfStreamReporter* mEndOfStreamReporter;
+    StreamGraphInterface* mStreamGraphInterface;
+    std::thread mThread;
+    bool mStopped = true;
+    std::mutex mStopObservationLock;
+};
+
+class StreamSetObserver : public EndOfStreamReporter {
+  public:
+    StreamSetObserver(const runner::ClientConfig& clientConfig,
+                      StreamGraphInterface* streamGraphInterface);
+
+    Status startObservingStreams();
+
+    void stopObservingStreams(bool stopImmediately);
+
+    void reportStreamClosed(int streamId) override;
+  private:
+    const runner::ClientConfig& mClientConfig;
+    StreamGraphInterface* mStreamGraphInterface;
+    std::map<int, std::unique_ptr<SingleStreamObserver>> mStreamObservers;
+    std::mutex mLock;
+    std::condition_variable mStoppedCv;
+    bool mStopped = true;
+};
+
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
+
+#endif  // #define COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
diff --git a/computepipe/runner/graph/include/PrebuiltGraph.h b/computepipe/runner/graph/include/PrebuiltGraph.h
index 88f7282..43ef754 100644
--- a/computepipe/runner/graph/include/PrebuiltGraph.h
+++ b/computepipe/runner/graph/include/PrebuiltGraph.h
@@ -15,11 +15,8 @@
 #ifndef COMPUTEPIPE_RUNNER_GRAPH_INCLUDE_PREBUILTGRAPH_H_
 #define COMPUTEPIPE_RUNNER_GRAPH_INCLUDE_PREBUILTGRAPH_H_
 
-#include <functional>
-#include <shared_mutex>
 #include <string>
 
-#include "ClientConfig.pb.h"
 #include "InputFrame.h"
 #include "Options.pb.h"
 #include "PrebuiltEngineInterface.h"
@@ -38,119 +35,54 @@
     STOPPED,
 };
 
-// PrebuiltGraph is a singleton class. This is because the underlying functions that it implements
-// are C functions that carry around state.
+enum PrebuiltGraphType {
+    LOCAL = 0,
+    REMOTE = 1,
+};
+
 class PrebuiltGraph : public runner::RunnerComponentInterface {
-  private:
-    // Private constructor
-    PrebuiltGraph() {
-    }
-
   public:
-    ~PrebuiltGraph();
+    // Gets the graph type.
+    virtual PrebuiltGraphType GetGraphType() const = 0;
 
-    // No copy or move constructors or operators are available.
-    PrebuiltGraph(const PrebuiltGraph&) = delete;
-    PrebuiltGraph& operator=(const PrebuiltGraph&) = delete;
+    // Gets the PrebuiltGraphState
+    virtual PrebuiltGraphState GetGraphState() const = 0;
 
-    // Override RunnerComponent interface functions for applying configs,
-    // starting the graph and stopping the graph.
-    Status handleConfigPhase(const runner::ClientConfig& e) override;
-    Status handleExecutionPhase(const runner::RunnerEvent& e) override;
-    Status handleStopWithFlushPhase(const runner::RunnerEvent& e) override;
-    Status handleStopImmediatePhase(const runner::RunnerEvent& e) override;
-    Status handleResetPhase(const runner::RunnerEvent& e) override;
+    // Gets the graph status and reports any error code or if the status is OK.
+    virtual Status GetStatus() const = 0;
 
-    static PrebuiltGraph* GetPrebuiltGraphFromLibrary(
-        const std::string& prebuiltLib, std::shared_ptr<PrebuiltEngineInterface> engineInterface);
-
-    PrebuiltGraphState GetGraphState() const {
-        return mGraphState;
-    }
-
-    Status GetStatus() const;
-
-    std::string GetErrorMessage() const;
+    // Gets the error message from the graph.
+    virtual std::string GetErrorMessage() const = 0;
 
     // Gets the supported graph config options.
-    const proto::Options& GetSupportedGraphConfigs() const {
-        return mGraphConfig;
-    }
+    virtual const proto::Options& GetSupportedGraphConfigs() const = 0;
 
     // Sets input stream data. The string is expected to be a serialized proto
     // the definition of which is known to the graph.
-    Status SetInputStreamData(int streamIndex, int64_t timestamp, const std::string& streamData);
+    virtual Status SetInputStreamData(int streamIndex, int64_t timestamp,
+                                      const std::string& streamData) = 0;
 
     // Sets pixel data to the specified input stream index.
-    Status SetInputStreamPixelData(int streamIndex, int64_t timestamp,
-                                   const runner::InputFrame& inputFrame);
+    virtual Status SetInputStreamPixelData(int streamIndex, int64_t timestamp,
+                                           const runner::InputFrame& inputFrame) = 0;
 
     // Start graph profiling.
-    Status StartGraphProfiling();
+    virtual Status StartGraphProfiling() = 0;
 
     // Stop graph profiling.
-    Status StopGraphProfiling();
+    virtual Status StopGraphProfiling() = 0;
 
     // Collects debugging and profiling information for the graph. The graph
     // needs to be started with debugging enabled in order to get valid info.
-    std::string GetDebugInfo();
-
-  private:
-    // Starts the graph execution.
-    Status StartGraphExecution(bool debuggingEnabled);
-
-    // Stops the graph execution.
-    Status StopGraphExecution(bool flushOutputFrames);
-
-    // Callback functions. The class has a C++ function callback interface while it deals with pure
-    // C functions underneath that do not have object context. We need to have these static
-    // functions that need to be passed to the C interface.
-    static void OutputPixelStreamCallbackFunction(void* cookie, int streamIndex, int64_t timestamp,
-                                                  const uint8_t* pixels, int width, int height,
-                                                  int step, int format);
-    static void OutputStreamCallbackFunction(void* cookie, int streamIndex, int64_t timestamp,
-                                             const unsigned char* data, size_t dataSize);
-    static void GraphTerminationCallbackFunction(void* cookie,
-                                                 const unsigned char* terminationMessage,
-                                                 size_t terminationMessageSize);
-
-    // Cached callback interface that is passed in from the runner.
-    std::shared_ptr<PrebuiltEngineInterface> mEngineInterface;
-
-    static std::mutex mCreationMutex;
-    static PrebuiltGraph* mPrebuiltGraphInstance;
-
-    // Even though mutexes are generally preferred over atomics, the only varialble in this class
-    // that changes after initialization is graph state and that is the only vairable that needs
-    // to be guarded. The prebuilt is internally assumed to be thread safe, so that concurrent
-    // calls into the library will automatically be handled in a thread safe manner by the it.
-    std::atomic<PrebuiltGraphState> mGraphState = PrebuiltGraphState::UNINITIALIZED;
-
-    // Dynamic library handle
-    void* mHandle;
-
-    // Repeated function calls need not be made to get the graph version and the config is this is
-    // constant through the operation of the graph. These values are just cached as strings.
-    std::string mGraphVersion;
-    proto::Options mGraphConfig;
-
-    // Cached functions from the dynamic library.
-    void* mFnGetErrorCode;
-    void* mFnGetErrorMessage;
-    void* mFnUpdateGraphConfig;
-    void* mFnResetGraph;
-    void* mFnSetInputStreamData;
-    void* mFnSetInputStreamPixelData;
-    void* mFnSetOutputStreamCallback;
-    void* mFnSetOutputPixelStreamCallback;
-    void* mFnSetGraphTerminationCallback;
-    void* mFnStartGraphExecution;
-    void* mFnStopGraphExecution;
-    void* mFnStartGraphProfiling;
-    void* mFnStopGraphProfiling;
-    void* mFnGetDebugInfo;
+    virtual std::string GetDebugInfo() = 0;
 };
 
+PrebuiltGraph* GetLocalGraphFromLibrary(
+        const std::string& prebuiltLib, std::weak_ptr<PrebuiltEngineInterface> engineInterface);
+
+std::unique_ptr<PrebuiltGraph> GetRemoteGraphFromAddress(
+        const std::string& address, std::weak_ptr<PrebuiltEngineInterface> engineInterface);
+
 }  // namespace graph
 }  // namespace computepipe
 }  // namespace automotive
diff --git a/computepipe/runner/graph/proto/Android.bp b/computepipe/runner/graph/proto/Android.bp
new file mode 100644
index 0000000..2911a52
--- /dev/null
+++ b/computepipe/runner/graph/proto/Android.bp
@@ -0,0 +1,69 @@
+genrule {
+    name: "computepipe_grpc_graph_proto_h",
+    tools: [
+        "aprotoc",
+        "protoc-gen-grpc-cpp-plugin",
+    ],
+    cmd: "$(location aprotoc) -I$$(dirname $(in)) -Iexternal/protobuf/src --plugin=protoc-gen-grpc=$(location protoc-gen-grpc-cpp-plugin) $(in) --grpc_out=$(genDir) --cpp_out=$(genDir)",
+    srcs: [
+        "GrpcPrebuiltGraphService.proto",
+    ],
+    out: [
+        "GrpcPrebuiltGraphService.grpc.pb.h",
+        "GrpcPrebuiltGraphService.pb.h",
+    ],
+}
+
+genrule {
+    name: "computepipe_grpc_graph_proto_cc",
+    tools: [
+        "aprotoc",
+        "protoc-gen-grpc-cpp-plugin",
+    ],
+    cmd: "$(location aprotoc) -I$$(dirname $(in)) -Iexternal/protobuf/src --plugin=protoc-gen-grpc=$(location protoc-gen-grpc-cpp-plugin) $(in) --grpc_out=$(genDir) --cpp_out=$(genDir)",
+    srcs: [
+        "GrpcPrebuiltGraphService.proto",
+    ],
+    out: [
+        "GrpcPrebuiltGraphService.grpc.pb.cc",
+        "GrpcPrebuiltGraphService.pb.cc",
+    ],
+}
+
+cc_library {
+    name: "computepipe_grpc_graph_proto",
+    proto: {
+        type: "lite",
+        export_proto_headers: true,
+    },
+    include_dirs: [
+        "external/protobuf/src",
+    ],
+    generated_headers: [
+        "computepipe_grpc_graph_proto_h",
+    ],
+    export_generated_headers: [
+        "computepipe_grpc_graph_proto_h",
+    ],
+    generated_sources: [
+        "computepipe_grpc_graph_proto_cc",
+    ],
+    cflags: [
+        "-Wall",
+        "-Werror",
+        "-Wno-unused-parameter",
+    ],
+    host_supported: false,
+    vendor_available: true,
+    target: {
+        android: {
+            proto: {
+                type: "lite",
+            },
+            shared_libs: [
+                "libprotobuf-cpp-full",
+                "libgrpc++",
+            ],
+        },
+    },
+}
diff --git a/computepipe/runner/graph/proto/GrpcPrebuiltGraphService.proto b/computepipe/runner/graph/proto/GrpcPrebuiltGraphService.proto
new file mode 100644
index 0000000..125b866
--- /dev/null
+++ b/computepipe/runner/graph/proto/GrpcPrebuiltGraphService.proto
@@ -0,0 +1,102 @@
+syntax = "proto2";
+
+package android.automotive.computepipe.proto;
+
+enum RemoteGraphStatusCode {
+    SUCCESS = 0;
+    INTERNAL_ERROR = 1;
+    INVALID_ARGUMENT = 2;
+    ILLEGAL_STATE = 3;
+    NO_MEMORY = 4;
+    FATAL_ERROR = 5;
+}
+
+message StatusResponse {
+    optional RemoteGraphStatusCode code = 1;
+    optional string message = 2;
+}
+
+message GraphOptionsRequest {}
+
+message GraphOptionsResponse {
+    optional string serialized_options = 1;
+}
+
+message SetGraphConfigRequest {
+    optional string serialized_config = 1;
+}
+
+message OutputStreamMetadata {
+    optional int32 stream_id = 1;
+}
+
+message ObserveOutputStreamRequest {
+    optional int32 stream_id = 1;
+}
+
+enum PixelFormat {
+    RGB = 0;
+    RGBA = 1;
+    GRAY = 2;
+}
+
+message PixelData {
+    optional int32 width = 1;
+    optional int32 height = 2;
+    optional int32 step = 3;
+    optional PixelFormat format = 4;
+    optional bytes data = 5;
+}
+
+message OutputStreamResponse {
+    oneof data {
+        string semantic_data = 1;
+        PixelData pixel_data = 2;
+    }
+    optional int32 stream_id = 3;
+    optional int64 timestamp_us = 4;
+}
+
+message SetDebugRequest {
+    optional bool enabled = 1;
+}
+
+message ProfilingDataRequest {}
+
+message ProfilingDataResponse {
+    optional string data = 1;
+}
+
+message StartGraphExecutionRequest {}
+
+message StopGraphExecutionRequest {
+    optional bool stop_immediate = 1;
+}
+
+message StartGraphProfilingRequest {}
+
+message StopGraphProfilingRequest {}
+
+message ResetGraphRequest {}
+
+service GrpcGraphService {
+    rpc GetGraphOptions(GraphOptionsRequest) returns (GraphOptionsResponse) {}
+
+    rpc SetGraphConfig(SetGraphConfigRequest) returns (StatusResponse) {}
+
+    rpc SetDebugOption(SetDebugRequest) returns (StatusResponse) {}
+
+    rpc StartGraphExecution(StartGraphExecutionRequest) returns (StatusResponse) {}
+
+    rpc ObserveOutputStream(ObserveOutputStreamRequest) returns (stream OutputStreamResponse) {}
+
+    rpc StopGraphExecution(StopGraphExecutionRequest) returns (StatusResponse) {}
+
+    rpc ResetGraph(ResetGraphRequest) returns (StatusResponse) {}
+
+    rpc StartGraphProfiling(StartGraphProfilingRequest) returns (StatusResponse) {}
+
+    rpc StopGraphProfiling(StopGraphProfilingRequest) returns (StatusResponse) {}
+
+    rpc GetProfilingData(ProfilingDataRequest) returns (ProfilingDataResponse) {}
+}
diff --git a/computepipe/tests/runner/graph/Android.bp b/computepipe/tests/runner/graph/Android.bp
index 10e40f2..14271dd 100644
--- a/computepipe/tests/runner/graph/Android.bp
+++ b/computepipe/tests/runner/graph/Android.bp
@@ -17,7 +17,7 @@
     test_suites: ["device-tests"],
     srcs: [
         "EnumConversionTest.cpp",
-        "PrebuiltGraphTest.cpp",
+        "LocalPrebuiltGraphTest.cpp",
     ],
     static_libs: [
         "computepipe_prebuilt_graph",
@@ -40,5 +40,40 @@
         "packages/services/Car/computepipe",
         "packages/services/Car/computepipe/runner/graph",
     ],
+}
 
+cc_test {
+    name: "computepipe_grpc_graph_test",
+    cflags: [
+        "-Wall",
+        "-Werror",
+        "-Wextra",
+        "-Wno-unused-parameter",
+    ],
+    test_suites: ["device-tests"],
+    srcs: [
+        "GrpcGraphTest.cpp",
+    ],
+    static_libs: [
+        "computepipe_grpc_graph_proto",
+        "computepipe_runner_component",
+        "libgtest",
+        "libgmock",
+    ],
+    shared_libs: [
+        "computepipe_grpc_graph",
+        "libbase",
+	      "libcomputepipeprotos",
+        "libgrpc++",
+        "libdl",
+        "liblog",
+        "libprotobuf-cpp-full",
+    ],
+    header_libs: [
+        "computepipe_runner_includes",
+    ],
+    include_dirs: [
+        "packages/services/Car/computepipe",
+        "packages/services/Car/computepipe/runner/graph",
+    ],
 }
diff --git a/computepipe/tests/runner/graph/GrpcGraphTest.cpp b/computepipe/tests/runner/graph/GrpcGraphTest.cpp
new file mode 100644
index 0000000..883ef6f
--- /dev/null
+++ b/computepipe/tests/runner/graph/GrpcGraphTest.cpp
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <android-base/logging.h>
+#include <grpc++/grpc++.h>
+
+#include "ClientConfig.pb.h"
+#include "GrpcPrebuiltGraphService.grpc.pb.h"
+#include "GrpcPrebuiltGraphService.pb.h"
+#include "Options.pb.h"
+#include "PrebuiltEngineInterface.h"
+#include "PrebuiltGraph.h"
+#include "RunnerComponent.h"
+#include "gmock/gmock-matchers.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "types/Status.h"
+
+using ::android::automotive::computepipe::runner::ClientConfig;
+using ::android::automotive::computepipe::runner::RunnerComponentInterface;
+using ::android::automotive::computepipe::runner::RunnerEvent;
+using ::testing::HasSubstr;
+
+namespace android {
+namespace automotive {
+namespace computepipe {
+namespace graph {
+namespace {
+
+constexpr char kGraphName[] = "Dummy graph name";
+constexpr char kSetGraphConfigMessage[] = "Dummy set config message";
+constexpr char kSetDebugOptionMessage[] = "Dummy set debug option message";
+constexpr char kStartGraphMessage[] = "Dummy start graph message";
+constexpr char kStopGraphMessage[] = "Dummy stop graph message";
+constexpr char kOutputStreamPacket[] = "Dummy output stream packet";
+constexpr char kResetGraphMessage[] = "ResetGraphMessage";
+
+// This is a barebones synchronous server implementation. A better implementation would be an
+// asynchronous implementation and it is upto the graph provider to do that. This implementation
+// is very specific to tests being conducted here.
+class GrpcGraphServerImpl : public proto::GrpcGraphService::Service {
+private:
+    std::string mServerAddress;
+    std::unique_ptr<::grpc::Server> mServer;
+    std::mutex mLock;
+    std::condition_variable mShutdownCv;
+    bool mShutdown = false;
+
+public:
+    explicit GrpcGraphServerImpl(std::string address) : mServerAddress(address) {}
+
+    virtual ~GrpcGraphServerImpl() {
+        if (mServer) {
+            mServer->Shutdown();
+            std::unique_lock lock(mLock);
+            if (!mShutdown) {
+                mShutdownCv.wait_for(lock, std::chrono::seconds(10),
+                                     [this]() { return mShutdown; });
+            }
+        }
+    }
+
+    void startServer() {
+        if (mServer == nullptr) {
+            ::grpc::ServerBuilder builder;
+            builder.RegisterService(this);
+            builder.AddListeningPort(mServerAddress, ::grpc::InsecureServerCredentials());
+            mServer = builder.BuildAndStart();
+            mServer->Wait();
+            std::lock_guard lock(mLock);
+            mShutdown = true;
+            mShutdownCv.notify_one();
+        }
+    }
+
+    ::grpc::Status GetGraphOptions(::grpc::ServerContext* context,
+                                   const proto::GraphOptionsRequest* request,
+                                   proto::GraphOptionsResponse* response) override {
+        proto::Options options;
+        options.set_graph_name(kGraphName);
+        response->set_serialized_options(options.SerializeAsString());
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status SetGraphConfig(::grpc::ServerContext* context,
+                                  const proto::SetGraphConfigRequest* request,
+                                  proto::StatusResponse* response) override {
+        response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
+        response->set_message(kSetGraphConfigMessage);
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status SetDebugOption(::grpc::ServerContext* context,
+                                  const proto::SetDebugRequest* request,
+                                  proto::StatusResponse* response) override {
+        response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
+        response->set_message(kSetDebugOptionMessage);
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status StartGraphExecution(::grpc::ServerContext* context,
+                                       const proto::StartGraphExecutionRequest* request,
+                                       proto::StatusResponse* response) override {
+        response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
+        response->set_message(kStartGraphMessage);
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status ObserveOutputStream(
+            ::grpc::ServerContext* context, const proto::ObserveOutputStreamRequest* request,
+            ::grpc::ServerWriter<proto::OutputStreamResponse>* writer) override {
+        // Write as many output packets as stream id. This is just to test different number of
+        // packets received with each stream. Also write even numbered stream as a pixel packet
+        // and odd numbered stream as a data packet.
+        for (int i = 0; i < request->stream_id(); i++) {
+            proto::OutputStreamResponse response;
+            if (request->stream_id() % 2 == 0) {
+                response.mutable_pixel_data()->set_data(kOutputStreamPacket);
+                response.mutable_pixel_data()->set_height(1);
+                response.mutable_pixel_data()->set_width(sizeof(kOutputStreamPacket));
+                response.mutable_pixel_data()->set_step(sizeof(kOutputStreamPacket));
+                response.mutable_pixel_data()->set_format(proto::PixelFormat::GRAY);
+                EXPECT_TRUE(response.has_pixel_data());
+            } else {
+                response.set_semantic_data(kOutputStreamPacket);
+                EXPECT_TRUE(response.has_semantic_data());
+            }
+            if (!writer->Write(response)) {
+                return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost");
+            }
+        }
+
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status StopGraphExecution(::grpc::ServerContext* context,
+                                      const proto::StopGraphExecutionRequest* request,
+                                      proto::StatusResponse* response) override {
+        response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
+        response->set_message(kStopGraphMessage);
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status ResetGraph(::grpc::ServerContext* context,
+                              const proto::ResetGraphRequest* request,
+                              proto::StatusResponse* response) override {
+        response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
+        response->set_message(kResetGraphMessage);
+        return ::grpc::Status::OK;
+    }
+
+    ::grpc::Status GetProfilingData(::grpc::ServerContext* context,
+                                    const proto::ProfilingDataRequest* request,
+                                    proto::ProfilingDataResponse* response) {
+        response->set_data(kSetGraphConfigMessage);
+        return ::grpc::Status::OK;
+    }
+};
+
+class PrebuiltEngineInterfaceImpl : public PrebuiltEngineInterface {
+private:
+    std::map<int, int> mNumPacketsPerStream;
+    std::mutex mLock;
+    std::condition_variable mCv;
+    bool mGraphTerminated = false;
+
+public:
+    // Prebuilt to engine interface
+    void DispatchPixelData(int streamId, int64_t timestamp,
+                           const runner::InputFrame& frame) override {
+        ASSERT_EQ(streamId % 2, 0);
+        std::lock_guard lock(mLock);
+        if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) {
+            mNumPacketsPerStream[streamId] = 1;
+        } else {
+            mNumPacketsPerStream[streamId]++;
+        }
+    }
+
+    void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& data) override {
+        ASSERT_EQ(streamId % 2, 1);
+        std::lock_guard lock(mLock);
+        if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) {
+            mNumPacketsPerStream[streamId] = 1;
+        } else {
+            mNumPacketsPerStream[streamId]++;
+        }
+    }
+
+    void DispatchGraphTerminationMessage(Status status, std::string&& msg) override {
+        std::lock_guard lock(mLock);
+        mGraphTerminated = true;
+        mCv.notify_one();
+    }
+
+    bool waitForTermination() {
+        std::unique_lock lock(mLock);
+        if (!mGraphTerminated) {
+            mCv.wait_for(lock, std::chrono::seconds(10), [this] { return mGraphTerminated; });
+        }
+        return mGraphTerminated;
+    }
+
+    int numPacketsForStream(int streamId) {
+        std::lock_guard lock(mLock);
+        auto it = mNumPacketsPerStream.find(streamId);
+        if (it == mNumPacketsPerStream.end()) {
+            return 0;
+        }
+        return it->second;
+    }
+};
+
+class GrpcGraphTest : public ::testing::Test {
+private:
+    std::unique_ptr<GrpcGraphServerImpl> mServer;
+    std::shared_ptr<PrebuiltEngineInterfaceImpl> mEngine;
+    std::string mAddress = "[::]:10000";
+
+public:
+    std::unique_ptr<PrebuiltGraph> mGrpcGraph;
+
+    void SetUp() override {
+        mServer = std::make_unique<GrpcGraphServerImpl>(mAddress);
+        std::thread t = std::thread([this]() { mServer->startServer(); });
+        t.detach();
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+
+        mEngine = std::make_shared<PrebuiltEngineInterfaceImpl>();
+        mGrpcGraph = GetRemoteGraphFromAddress(mAddress, mEngine);
+        ASSERT_TRUE(mGrpcGraph != nullptr);
+        EXPECT_EQ(mGrpcGraph->GetSupportedGraphConfigs().graph_name(), kGraphName);
+        EXPECT_EQ(mGrpcGraph->GetGraphType(), PrebuiltGraphType::REMOTE);
+    }
+
+    void TearDown() override { mServer.reset(); }
+
+    bool waitForTermination() { return mEngine->waitForTermination(); }
+
+    int numPacketsForStream(int streamId) { return mEngine->numPacketsForStream(streamId); }
+};
+
+class TestRunnerEvent : public runner::RunnerEvent {
+    bool isPhaseEntry() const override { return true; }
+    bool isTransitionComplete() const override { return false; }
+    bool isAborted() const override { return false; }
+    Status dispatchToComponent(const std::shared_ptr<runner::RunnerComponentInterface>&) override {
+        return Status::SUCCESS;
+    };
+};
+
+// Test to see if stop with flush produces exactly as many packets as expected. The number
+// of packets produced by stopImmediate is variable as the number of packets already dispatched
+// when stop is called is variable.
+TEST_F(GrpcGraphTest, EndToEndTestOnStopWithFlush) {
+    std::map<int, int> outputConfigs = {{5, 1}, {6, 1}};
+    runner::ClientConfig clientConfig(0, 0, 0, outputConfigs, proto::ProfilingType::DISABLED);
+
+    EXPECT_EQ(mGrpcGraph->handleConfigPhase(clientConfig), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::STOPPED);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    TestRunnerEvent e;
+    EXPECT_EQ(mGrpcGraph->handleExecutionPhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::RUNNING);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_EQ(mGrpcGraph->handleStopWithFlushPhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::FLUSHING);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_TRUE(waitForTermination());
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::STOPPED);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+    EXPECT_EQ(numPacketsForStream(5), 5);
+    EXPECT_EQ(numPacketsForStream(6), 6);
+}
+
+TEST_F(GrpcGraphTest, GraphStopCallbackProducedOnImmediateStop) {
+    std::map<int, int> outputConfigs = {{5, 1}, {6, 1}};
+    runner::ClientConfig clientConfig(0, 0, 0, outputConfigs, proto::ProfilingType::DISABLED);
+
+    EXPECT_EQ(mGrpcGraph->handleConfigPhase(clientConfig), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::STOPPED);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    TestRunnerEvent e;
+    EXPECT_EQ(mGrpcGraph->handleExecutionPhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::RUNNING);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_EQ(mGrpcGraph->handleStopImmediatePhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::STOPPED);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_TRUE(waitForTermination());
+}
+
+TEST_F(GrpcGraphTest, GraphStopCallbackProducedOnFlushedStopWithNoOutputStreams) {
+    std::map<int, int> outputConfigs = {};
+    runner::ClientConfig clientConfig(0, 0, 0, outputConfigs, proto::ProfilingType::DISABLED);
+    EXPECT_EQ(mGrpcGraph->handleConfigPhase(clientConfig), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::STOPPED);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    TestRunnerEvent e;
+    EXPECT_EQ(mGrpcGraph->handleExecutionPhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetGraphState(), PrebuiltGraphState::RUNNING);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_EQ(mGrpcGraph->handleStopWithFlushPhase(e), Status::SUCCESS);
+    EXPECT_EQ(mGrpcGraph->GetStatus(), Status::SUCCESS);
+
+    EXPECT_TRUE(waitForTermination());
+}
+
+TEST_F(GrpcGraphTest, SetInputStreamsFailAsExpected) {
+    runner::InputFrame frame(0, 0, static_cast<PixelFormat>(0), 0, nullptr);
+    EXPECT_EQ(mGrpcGraph->SetInputStreamData(0, 0, ""), Status::FATAL_ERROR);
+    EXPECT_EQ(mGrpcGraph->SetInputStreamPixelData(0, 0, frame), Status::FATAL_ERROR);
+}
+
+}  // namespace
+}  // namespace graph
+}  // namespace computepipe
+}  // namespace automotive
+}  // namespace android
diff --git a/computepipe/tests/runner/graph/PrebuiltGraphTest.cpp b/computepipe/tests/runner/graph/LocalPrebuiltGraphTest.cpp
similarity index 79%
rename from computepipe/tests/runner/graph/PrebuiltGraphTest.cpp
rename to computepipe/tests/runner/graph/LocalPrebuiltGraphTest.cpp
index a8e417e..d63ec51 100644
--- a/computepipe/tests/runner/graph/PrebuiltGraphTest.cpp
+++ b/computepipe/tests/runner/graph/LocalPrebuiltGraphTest.cpp
@@ -16,14 +16,14 @@
 #include <string>
 
 #include "ClientConfig.pb.h"
+#include "LocalPrebuiltGraph.h"
 #include "PrebuiltEngineInterface.h"
-#include "PrebuiltGraph.h"
+#include "ProfilingType.pb.h"
 #include "RunnerComponent.h"
 #include "gmock/gmock-matchers.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "types/Status.h"
-#include "ProfilingType.pb.h"
 
 using ::android::automotive::computepipe::runner::ClientConfig;
 using ::android::automotive::computepipe::runner::RunnerComponentInterface;
@@ -42,12 +42,12 @@
 typedef std::function<void(int, int64_t, std::string&&)> SerializedStreamCallback;
 typedef std::function<void(Status, std::string&&)> GraphTerminationCallback;
 class PrebuiltEngineInterfaceImpl : public PrebuiltEngineInterface {
-  private:
+private:
     PixelCallback mPixelCallbackFn;
     SerializedStreamCallback mSerializedStreamCallbackFn;
     GraphTerminationCallback mGraphTerminationCallbackFn;
 
-  public:
+public:
     virtual ~PrebuiltEngineInterfaceImpl() = default;
 
     void DispatchPixelData(int streamId, int64_t timestamp,
@@ -63,9 +63,7 @@
         mGraphTerminationCallbackFn(status, std::move(msg));
     }
 
-    void SetPixelCallback(PixelCallback callback) {
-        mPixelCallbackFn = callback;
-    }
+    void SetPixelCallback(PixelCallback callback) { mPixelCallbackFn = callback; }
 
     void SetSerializedStreamCallback(SerializedStreamCallback callback) {
         mSerializedStreamCallbackFn = callback;
@@ -87,26 +85,26 @@
 // The above two properties are used to test that the prebuilt graph wrapper calls the correct
 // functions and callbacks are issued as expected. These tests do not test the internals of the
 // graph themselves and such tests must be written along with the graph implementation.
-TEST(PrebuiltGraphTest, FunctionMappingFromLibraryIsSuccessful) {
+TEST(LocalPrebuiltGraphTest, FunctionMappingFromLibraryIsSuccessful) {
     PrebuiltEngineInterfaceImpl callback;
     std::shared_ptr<PrebuiltEngineInterface> engineInterface =
-        std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
-            std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
-    PrebuiltGraph* graph =
-        PrebuiltGraph::GetPrebuiltGraphFromLibrary("libstubgraphimpl.so", engineInterface);
+            std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
+                    std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
+    PrebuiltGraph* graph = GetLocalGraphFromLibrary("libstubgraphimpl.so", engineInterface);
     ASSERT_TRUE(graph);
+    EXPECT_EQ(graph->GetGraphType(), PrebuiltGraphType::LOCAL);
     EXPECT_NE(graph->GetGraphState(), PrebuiltGraphState::UNINITIALIZED);
     EXPECT_EQ(graph->GetSupportedGraphConfigs().graph_name(), "stub_graph");
 }
 
-TEST(PrebuiltGraphTest, GraphConfigurationIssuesCorrectFunctionCalls) {
+TEST(LocalPrebuiltGraphTest, GraphConfigurationIssuesCorrectFunctionCalls) {
     PrebuiltEngineInterfaceImpl callback;
     std::shared_ptr<PrebuiltEngineInterface> engineInterface =
-        std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
-            std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
-    PrebuiltGraph* graph =
-        PrebuiltGraph::GetPrebuiltGraphFromLibrary("libstubgraphimpl.so", engineInterface);
+            std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
+                    std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
+    PrebuiltGraph* graph = GetLocalGraphFromLibrary("libstubgraphimpl.so", engineInterface);
     ASSERT_TRUE(graph);
+    EXPECT_EQ(graph->GetGraphType(), PrebuiltGraphType::LOCAL);
     ASSERT_NE(graph->GetGraphState(), PrebuiltGraphState::UNINITIALIZED);
 
     graph->GetSupportedGraphConfigs();
@@ -124,13 +122,13 @@
     EXPECT_THAT(functionVisited, HasSubstr("GetErrorCode"));
 }
 
-TEST(PrebuiltGraphTest, GraphOperationEndToEndIsSuccessful) {
+TEST(LocalPrebuiltGraphTest, GraphOperationEndToEndIsSuccessful) {
     bool graphHasTerminated = false;
     int numOutputStreamCallbacksReceived[4] = {0, 0, 0, 0};
 
     PrebuiltEngineInterfaceImpl callback;
     callback.SetGraphTerminationCallback(
-        [&graphHasTerminated](Status, std::string) { graphHasTerminated = true; });
+            [&graphHasTerminated](Status, std::string) { graphHasTerminated = true; });
 
     // Add multiple pixel stream callback functions to see if all of them register.
     callback.SetPixelCallback([&numOutputStreamCallbacksReceived](int streamIndex, int64_t,
@@ -141,18 +139,18 @@
 
     // Add multiple stream callback functions to see if all of them register.
     callback.SetSerializedStreamCallback(
-        [&numOutputStreamCallbacksReceived](int streamIndex, int64_t, std::string&&) {
-            ASSERT_TRUE(streamIndex == 2 || streamIndex == 3);
-            numOutputStreamCallbacksReceived[streamIndex]++;
-        });
+            [&numOutputStreamCallbacksReceived](int streamIndex, int64_t, std::string&&) {
+                ASSERT_TRUE(streamIndex == 2 || streamIndex == 3);
+                numOutputStreamCallbacksReceived[streamIndex]++;
+            });
 
     std::shared_ptr<PrebuiltEngineInterface> engineInterface =
-        std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
-            std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
+            std::static_pointer_cast<PrebuiltEngineInterface, PrebuiltEngineInterfaceImpl>(
+                    std::make_shared<PrebuiltEngineInterfaceImpl>(callback));
 
-    PrebuiltGraph* graph =
-        PrebuiltGraph::GetPrebuiltGraphFromLibrary("libstubgraphimpl.so", engineInterface);
+    PrebuiltGraph* graph = GetLocalGraphFromLibrary("libstubgraphimpl.so", engineInterface);
 
+    EXPECT_EQ(graph->GetGraphType(), PrebuiltGraphType::LOCAL);
     ASSERT_NE(graph->GetGraphState(), PrebuiltGraphState::UNINITIALIZED);
 
     graph->GetSupportedGraphConfigs();
@@ -171,19 +169,19 @@
 
     runner::InputFrame inputFrame(0, 0, PixelFormat::RGB, 0, nullptr);
     EXPECT_EQ(graph->SetInputStreamPixelData(
-                  /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
+                      /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
               Status::SUCCESS);
     EXPECT_EQ(graph->SetInputStreamPixelData(
-                  /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
+                      /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
               Status::SUCCESS);
     EXPECT_EQ(graph->SetInputStreamPixelData(
-                  /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
+                      /*streamIndex =*/0, /*timestamp =*/0, /*inputFrame =*/inputFrame),
               Status::SUCCESS);
     EXPECT_EQ(graph->SetInputStreamPixelData(
-                  /*streamIndex =*/1, /*timestamp =*/0, /*inputFrame =*/inputFrame),
+                      /*streamIndex =*/1, /*timestamp =*/0, /*inputFrame =*/inputFrame),
               Status::SUCCESS);
     EXPECT_EQ(graph->SetInputStreamPixelData(
-                  /*streamIndex =*/1, /*timestamp =*/0, /*inputFrame =*/inputFrame),
+                      /*streamIndex =*/1, /*timestamp =*/0, /*inputFrame =*/inputFrame),
               Status::SUCCESS);
     functionVisited = graph->GetErrorMessage();
     EXPECT_THAT(functionVisited, HasSubstr("SetInputStreamPixelData"));