Yogeshwar Nagaraj | f8c8d18 | 2020-04-01 13:09:08 -0700 | [diff] [blame] | 1 | // Copyright (C) 2020 The Android Open Source Project |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | #include "GrpcGraph.h" |
| 16 | |
| 17 | #include <cstdlib> |
| 18 | |
| 19 | #include <android-base/logging.h> |
| 20 | #include <grpcpp/grpcpp.h> |
| 21 | |
| 22 | #include "ClientConfig.pb.h" |
| 23 | #include "GrpcGraph.h" |
| 24 | #include "InputFrame.h" |
| 25 | #include "RunnerComponent.h" |
| 26 | #include "prebuilt_interface.h" |
| 27 | #include "types/Status.h" |
| 28 | |
| 29 | namespace android { |
| 30 | namespace automotive { |
| 31 | namespace computepipe { |
| 32 | namespace graph { |
| 33 | namespace { |
| 34 | constexpr int64_t kRpcDeadlineMilliseconds = 100; |
| 35 | |
| 36 | template <class ResponseType, class RpcType> |
| 37 | std::pair<Status, std::string> FinishRpcAndGetResult( |
| 38 | ::grpc::ClientAsyncResponseReader<RpcType>* rpc, ::grpc::CompletionQueue* cq, |
| 39 | ResponseType* response) { |
| 40 | int random_tag = rand(); |
| 41 | ::grpc::Status grpcStatus; |
| 42 | rpc->Finish(response, &grpcStatus, reinterpret_cast<void*>(random_tag)); |
| 43 | bool ok = false; |
| 44 | void* got_tag; |
| 45 | if (!cq->Next(&got_tag, &ok)) { |
| 46 | LOG(ERROR) << "Unexpected shutdown of the completion queue"; |
| 47 | return std::pair(Status::FATAL_ERROR, "Unexpected shutdown of the completion queue"); |
| 48 | } |
| 49 | |
| 50 | if (!ok) { |
| 51 | LOG(ERROR) << "Unable to complete RPC request"; |
| 52 | return std::pair(Status::FATAL_ERROR, "Unable to complete RPC request"); |
| 53 | } |
| 54 | |
| 55 | CHECK_EQ(got_tag, reinterpret_cast<void*>(random_tag)); |
| 56 | if (!grpcStatus.ok()) { |
| 57 | std::string error_message = |
| 58 | std::string("Grpc failed with error: ") + grpcStatus.error_message(); |
| 59 | LOG(ERROR) << error_message; |
| 60 | return std::pair(Status::FATAL_ERROR, std::move(error_message)); |
| 61 | } |
| 62 | |
| 63 | return std::pair(Status::SUCCESS, std::string("")); |
| 64 | } |
| 65 | |
| 66 | } // namespace |
| 67 | |
Kathan Shukla | 3cb6a83 | 2020-10-22 12:08:22 -0700 | [diff] [blame^] | 68 | GrpcGraph::~GrpcGraph() { |
| 69 | mStreamSetObserver.reset(); |
| 70 | } |
| 71 | |
Yogeshwar Nagaraj | f8c8d18 | 2020-04-01 13:09:08 -0700 | [diff] [blame] | 72 | PrebuiltGraphState GrpcGraph::GetGraphState() const { |
| 73 | std::lock_guard lock(mLock); |
| 74 | return mGraphState; |
| 75 | } |
| 76 | |
| 77 | Status GrpcGraph::GetStatus() const { |
| 78 | std::lock_guard lock(mLock); |
| 79 | return mStatus; |
| 80 | } |
| 81 | |
| 82 | std::string GrpcGraph::GetErrorMessage() const { |
| 83 | std::lock_guard lock(mLock); |
| 84 | return mErrorMessage; |
| 85 | } |
| 86 | |
| 87 | Status GrpcGraph::initialize(const std::string& address, |
| 88 | std::weak_ptr<PrebuiltEngineInterface> engineInterface) { |
| 89 | std::shared_ptr<::grpc::ChannelCredentials> creds = ::grpc::InsecureChannelCredentials(); |
| 90 | std::shared_ptr<::grpc::Channel> channel = ::grpc::CreateChannel(address, creds); |
| 91 | mGraphStub = proto::GrpcGraphService::NewStub(channel); |
| 92 | mEngineInterface = engineInterface; |
| 93 | |
| 94 | ::grpc::ClientContext context; |
| 95 | context.set_deadline(std::chrono::system_clock::now() + |
| 96 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 97 | ::grpc::CompletionQueue cq; |
| 98 | |
| 99 | proto::GraphOptionsRequest getGraphOptionsRequest; |
| 100 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::GraphOptionsResponse>> rpc( |
| 101 | mGraphStub->AsyncGetGraphOptions(&context, getGraphOptionsRequest, &cq)); |
| 102 | |
| 103 | proto::GraphOptionsResponse response; |
| 104 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 105 | |
| 106 | if (mStatus != Status::SUCCESS) { |
| 107 | LOG(ERROR) << "Failed to get graph options: " << mErrorMessage; |
| 108 | return Status::FATAL_ERROR; |
| 109 | } |
| 110 | |
| 111 | std::string serialized_options = response.serialized_options(); |
| 112 | if (!mGraphConfig.ParseFromString(serialized_options)) { |
| 113 | mErrorMessage = "Failed to parse graph options"; |
| 114 | LOG(ERROR) << "Failed to parse graph options"; |
| 115 | return Status::FATAL_ERROR; |
| 116 | } |
| 117 | |
| 118 | mGraphState = PrebuiltGraphState::STOPPED; |
| 119 | return Status::SUCCESS; |
| 120 | } |
| 121 | |
| 122 | // Function to confirm that there would be no further changes to the graph configuration. This |
| 123 | // needs to be called before starting the graph. |
| 124 | Status GrpcGraph::handleConfigPhase(const runner::ClientConfig& e) { |
| 125 | std::lock_guard lock(mLock); |
| 126 | if (mGraphState == PrebuiltGraphState::UNINITIALIZED) { |
| 127 | mStatus = Status::ILLEGAL_STATE; |
| 128 | return Status::ILLEGAL_STATE; |
| 129 | } |
| 130 | |
| 131 | // handleConfigPhase is a blocking call, so abort call is pointless for this RunnerEvent. |
| 132 | if (e.isAborted()) { |
| 133 | mStatus = Status::INVALID_ARGUMENT; |
| 134 | return mStatus; |
| 135 | } else if (e.isTransitionComplete()) { |
| 136 | mStatus = Status::SUCCESS; |
| 137 | return mStatus; |
| 138 | } |
| 139 | |
| 140 | ::grpc::ClientContext context; |
| 141 | context.set_deadline(std::chrono::system_clock::now() + |
| 142 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 143 | ::grpc::CompletionQueue cq; |
| 144 | |
| 145 | std::string serializedConfig = e.getSerializedClientConfig(); |
| 146 | proto::SetGraphConfigRequest setGraphConfigRequest; |
| 147 | setGraphConfigRequest.set_serialized_config(std::move(serializedConfig)); |
| 148 | |
| 149 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 150 | mGraphStub->AsyncSetGraphConfig(&context, setGraphConfigRequest, &cq)); |
| 151 | |
| 152 | proto::StatusResponse response; |
| 153 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 154 | if (mStatus != Status::SUCCESS) { |
| 155 | LOG(ERROR) << "Rpc failed while trying to set configuration"; |
| 156 | return mStatus; |
| 157 | } |
| 158 | |
| 159 | if (response.code() != proto::RemoteGraphStatusCode::SUCCESS) { |
| 160 | LOG(ERROR) << "Failed to cofngure remote graph. " << response.message(); |
| 161 | } |
| 162 | |
| 163 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 164 | mErrorMessage = response.message(); |
| 165 | |
| 166 | mStreamSetObserver = std::make_unique<StreamSetObserver>(e, this); |
| 167 | |
| 168 | return mStatus; |
| 169 | } |
| 170 | |
| 171 | // Starts the graph. |
| 172 | Status GrpcGraph::handleExecutionPhase(const runner::RunnerEvent& e) { |
| 173 | std::lock_guard lock(mLock); |
| 174 | if (mGraphState != PrebuiltGraphState::STOPPED) { |
| 175 | mStatus = Status::ILLEGAL_STATE; |
| 176 | return mStatus; |
| 177 | } |
| 178 | |
| 179 | if (e.isAborted()) { |
| 180 | // Starting the graph is a blocking call and cannot be aborted in between. |
| 181 | mStatus = Status::INVALID_ARGUMENT; |
| 182 | return mStatus; |
| 183 | } else if (e.isTransitionComplete()) { |
| 184 | mStatus = Status::SUCCESS; |
| 185 | return mStatus; |
| 186 | } |
| 187 | |
| 188 | // Start observing the output streams |
| 189 | mStatus = mStreamSetObserver->startObservingStreams(); |
| 190 | if (mStatus != Status::SUCCESS) { |
| 191 | mErrorMessage = "Failed to observe output streams"; |
| 192 | return mStatus; |
| 193 | } |
| 194 | |
| 195 | ::grpc::ClientContext context; |
| 196 | context.set_deadline(std::chrono::system_clock::now() + |
| 197 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 198 | |
| 199 | proto::StartGraphExecutionRequest startExecutionRequest; |
| 200 | ::grpc::CompletionQueue cq; |
| 201 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 202 | mGraphStub->AsyncStartGraphExecution(&context, startExecutionRequest, &cq)); |
| 203 | |
| 204 | proto::StatusResponse response; |
| 205 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 206 | if (mStatus != Status::SUCCESS) { |
| 207 | LOG(ERROR) << "Failed to start graph execution"; |
| 208 | return mStatus; |
| 209 | } |
| 210 | |
| 211 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 212 | mErrorMessage = response.message(); |
| 213 | |
| 214 | if (mStatus == Status::SUCCESS) { |
| 215 | mGraphState = PrebuiltGraphState::RUNNING; |
| 216 | } |
| 217 | |
| 218 | return mStatus; |
| 219 | } |
| 220 | |
| 221 | // Stops the graph while letting the graph flush output packets in flight. |
| 222 | Status GrpcGraph::handleStopWithFlushPhase(const runner::RunnerEvent& e) { |
| 223 | std::lock_guard lock(mLock); |
| 224 | if (mGraphState != PrebuiltGraphState::RUNNING) { |
| 225 | return Status::ILLEGAL_STATE; |
| 226 | } |
| 227 | |
| 228 | if (e.isAborted()) { |
| 229 | return Status::INVALID_ARGUMENT; |
| 230 | } else if (e.isTransitionComplete()) { |
| 231 | return Status::SUCCESS; |
| 232 | } |
| 233 | |
| 234 | ::grpc::ClientContext context; |
| 235 | context.set_deadline(std::chrono::system_clock::now() + |
| 236 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 237 | |
| 238 | proto::StopGraphExecutionRequest stopExecutionRequest; |
| 239 | stopExecutionRequest.set_stop_immediate(false); |
| 240 | ::grpc::CompletionQueue cq; |
| 241 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 242 | mGraphStub->AsyncStopGraphExecution(&context, stopExecutionRequest, &cq)); |
| 243 | |
| 244 | proto::StatusResponse response; |
| 245 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 246 | if (mStatus != Status::SUCCESS) { |
| 247 | LOG(ERROR) << "Failed to stop graph execution"; |
| 248 | return Status::FATAL_ERROR; |
| 249 | } |
| 250 | |
| 251 | // Stop observing streams immendiately. |
| 252 | mStreamSetObserver->stopObservingStreams(false); |
| 253 | |
| 254 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 255 | mErrorMessage = response.message(); |
| 256 | |
| 257 | if (mStatus == Status::SUCCESS) { |
| 258 | mGraphState = PrebuiltGraphState::FLUSHING; |
| 259 | } |
| 260 | |
| 261 | return mStatus; |
| 262 | } |
| 263 | |
| 264 | // Stops the graph and cancels all the output packets. |
| 265 | Status GrpcGraph::handleStopImmediatePhase(const runner::RunnerEvent& e) { |
| 266 | std::lock_guard lock(mLock); |
| 267 | if (mGraphState != PrebuiltGraphState::RUNNING) { |
| 268 | return Status::ILLEGAL_STATE; |
| 269 | } |
| 270 | |
| 271 | if (e.isAborted()) { |
| 272 | return Status::INVALID_ARGUMENT; |
| 273 | } else if (e.isTransitionComplete()) { |
| 274 | return Status::SUCCESS; |
| 275 | } |
| 276 | |
| 277 | ::grpc::ClientContext context; |
| 278 | context.set_deadline(std::chrono::system_clock::now() + |
| 279 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 280 | |
| 281 | proto::StopGraphExecutionRequest stopExecutionRequest; |
| 282 | stopExecutionRequest.set_stop_immediate(true); |
| 283 | ::grpc::CompletionQueue cq; |
| 284 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 285 | mGraphStub->AsyncStopGraphExecution(&context, stopExecutionRequest, &cq)); |
| 286 | |
| 287 | proto::StatusResponse response; |
| 288 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 289 | if (mStatus != Status::SUCCESS) { |
| 290 | LOG(ERROR) << "Failed to stop graph execution"; |
| 291 | return Status::FATAL_ERROR; |
| 292 | } |
| 293 | |
| 294 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 295 | mErrorMessage = response.message(); |
| 296 | |
| 297 | // Stop observing streams immendiately. |
| 298 | mStreamSetObserver->stopObservingStreams(true); |
| 299 | |
| 300 | if (mStatus == Status::SUCCESS) { |
| 301 | mGraphState = PrebuiltGraphState::STOPPED; |
| 302 | } |
| 303 | return mStatus; |
| 304 | } |
| 305 | |
| 306 | Status GrpcGraph::handleResetPhase(const runner::RunnerEvent& e) { |
| 307 | std::lock_guard lock(mLock); |
| 308 | if (mGraphState != PrebuiltGraphState::STOPPED) { |
| 309 | return Status::ILLEGAL_STATE; |
| 310 | } |
| 311 | |
| 312 | if (e.isAborted()) { |
| 313 | return Status::INVALID_ARGUMENT; |
| 314 | } else if (e.isTransitionComplete()) { |
| 315 | return Status::SUCCESS; |
| 316 | } |
| 317 | |
| 318 | ::grpc::ClientContext context; |
| 319 | context.set_deadline(std::chrono::system_clock::now() + |
| 320 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 321 | |
| 322 | proto::ResetGraphRequest resetGraphRequest; |
| 323 | ::grpc::CompletionQueue cq; |
| 324 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 325 | mGraphStub->AsyncResetGraph(&context, resetGraphRequest, &cq)); |
| 326 | |
| 327 | proto::StatusResponse response; |
| 328 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 329 | if (mStatus != Status::SUCCESS) { |
| 330 | LOG(ERROR) << "Failed to stop graph execution"; |
| 331 | return Status::FATAL_ERROR; |
| 332 | } |
| 333 | |
| 334 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 335 | mErrorMessage = response.message(); |
| 336 | mStreamSetObserver.reset(); |
| 337 | |
| 338 | return mStatus; |
| 339 | } |
| 340 | |
| 341 | Status GrpcGraph::SetInputStreamData(int /*streamIndex*/, int64_t /*timestamp*/, |
| 342 | const std::string& /*streamData*/) { |
| 343 | LOG(ERROR) << "Cannot set input stream for remote graphs"; |
| 344 | return Status::FATAL_ERROR; |
| 345 | } |
| 346 | |
| 347 | Status GrpcGraph::SetInputStreamPixelData(int /*streamIndex*/, int64_t /*timestamp*/, |
| 348 | const runner::InputFrame& /*inputFrame*/) { |
| 349 | LOG(ERROR) << "Cannot set input streams for remote graphs"; |
| 350 | return Status::FATAL_ERROR; |
| 351 | } |
| 352 | |
| 353 | Status GrpcGraph::StartGraphProfiling() { |
| 354 | std::lock_guard lock(mLock); |
| 355 | if (mGraphState != PrebuiltGraphState::RUNNING) { |
| 356 | return Status::ILLEGAL_STATE; |
| 357 | } |
| 358 | |
| 359 | ::grpc::ClientContext context; |
| 360 | context.set_deadline(std::chrono::system_clock::now() + |
| 361 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 362 | |
| 363 | proto::StartGraphProfilingRequest startProfilingRequest; |
| 364 | ::grpc::CompletionQueue cq; |
| 365 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 366 | mGraphStub->AsyncStartGraphProfiling(&context, startProfilingRequest, &cq)); |
| 367 | |
| 368 | proto::StatusResponse response; |
| 369 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 370 | if (mStatus != Status::SUCCESS) { |
| 371 | LOG(ERROR) << "Failed to start graph profiling"; |
| 372 | return Status::FATAL_ERROR; |
| 373 | } |
| 374 | |
| 375 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 376 | mErrorMessage = response.message(); |
| 377 | |
| 378 | return mStatus; |
| 379 | } |
| 380 | |
| 381 | Status GrpcGraph::StopGraphProfiling() { |
| 382 | // Stopping profiling after graph has already stopped can be a no-op |
| 383 | ::grpc::ClientContext context; |
| 384 | context.set_deadline(std::chrono::system_clock::now() + |
| 385 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 386 | |
| 387 | proto::StopGraphProfilingRequest stopProfilingRequest; |
| 388 | ::grpc::CompletionQueue cq; |
| 389 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::StatusResponse>> rpc( |
| 390 | mGraphStub->AsyncStopGraphProfiling(&context, stopProfilingRequest, &cq)); |
| 391 | |
| 392 | proto::StatusResponse response; |
| 393 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 394 | if (mStatus != Status::SUCCESS) { |
| 395 | LOG(ERROR) << "Failed to stop graph profiling"; |
| 396 | return Status::FATAL_ERROR; |
| 397 | } |
| 398 | |
| 399 | mStatus = static_cast<Status>(static_cast<int>(response.code())); |
| 400 | mErrorMessage = response.message(); |
| 401 | |
| 402 | return mStatus; |
| 403 | } |
| 404 | |
| 405 | std::string GrpcGraph::GetDebugInfo() { |
| 406 | ::grpc::ClientContext context; |
| 407 | context.set_deadline(std::chrono::system_clock::now() + |
| 408 | std::chrono::milliseconds(kRpcDeadlineMilliseconds)); |
| 409 | |
| 410 | proto::ProfilingDataRequest profilingDataRequest; |
| 411 | ::grpc::CompletionQueue cq; |
| 412 | std::unique_ptr<::grpc::ClientAsyncResponseReader<proto::ProfilingDataResponse>> rpc( |
| 413 | mGraphStub->AsyncGetProfilingData(&context, profilingDataRequest, &cq)); |
| 414 | |
| 415 | proto::ProfilingDataResponse response; |
| 416 | auto [mStatus, mErrorMessage] = FinishRpcAndGetResult(rpc.get(), &cq, &response); |
| 417 | if (mStatus != Status::SUCCESS) { |
| 418 | LOG(ERROR) << "Failed to get profiling info"; |
| 419 | return ""; |
| 420 | } |
| 421 | |
| 422 | return response.data(); |
| 423 | } |
| 424 | |
| 425 | void GrpcGraph::dispatchPixelData(int streamId, int64_t timestamp_us, |
| 426 | const runner::InputFrame& frame) { |
| 427 | std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock(); |
| 428 | if (engineInterface) { |
| 429 | return engineInterface->DispatchPixelData(streamId, timestamp_us, frame); |
| 430 | } |
| 431 | } |
| 432 | |
| 433 | void GrpcGraph::dispatchSerializedData(int streamId, int64_t timestamp_us, |
| 434 | std::string&& serialized_data) { |
| 435 | std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock(); |
| 436 | if (engineInterface) { |
| 437 | return engineInterface->DispatchSerializedData(streamId, timestamp_us, |
| 438 | std::move(serialized_data)); |
| 439 | } |
| 440 | } |
| 441 | |
| 442 | void GrpcGraph::dispatchGraphTerminationMessage(Status status, std::string&& errorMessage) { |
| 443 | std::lock_guard lock(mLock); |
| 444 | mErrorMessage = std::move(errorMessage); |
| 445 | mStatus = status; |
| 446 | mGraphState = PrebuiltGraphState::STOPPED; |
| 447 | std::shared_ptr<PrebuiltEngineInterface> engineInterface = mEngineInterface.lock(); |
| 448 | if (engineInterface) { |
| 449 | std::string errorMessageTmp = mErrorMessage; |
| 450 | engineInterface->DispatchGraphTerminationMessage(mStatus, std::move(errorMessageTmp)); |
| 451 | } |
| 452 | } |
| 453 | |
| 454 | std::unique_ptr<PrebuiltGraph> GetRemoteGraphFromAddress( |
| 455 | const std::string& address, std::weak_ptr<PrebuiltEngineInterface> engineInterface) { |
| 456 | auto prebuiltGraph = std::make_unique<GrpcGraph>(); |
| 457 | Status status = prebuiltGraph->initialize(address, engineInterface); |
| 458 | if (status != Status::SUCCESS) { |
| 459 | return nullptr; |
| 460 | } |
| 461 | |
| 462 | return prebuiltGraph; |
| 463 | } |
| 464 | |
| 465 | } // namespace graph |
| 466 | } // namespace computepipe |
| 467 | } // namespace automotive |
| 468 | } // namespace android |