blob: ab812d20a8ed2588010a9c2784aeccf8577dbedd [file] [log] [blame]
Hanumant Singhb10788f2020-01-16 13:34:00 -08001// Copyright (C) 2020 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "DefaultEngine.h"
16
17#include <android-base/logging.h>
18
19#include <algorithm>
20#include <cassert>
21#include <memory>
22#include <mutex>
23#include <thread>
24#include <vector>
25
26#include "ClientInterface.h"
27#include "EventGenerator.h"
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -080028#include "EvsDisplayManager.h"
Hanumant Singhb10788f2020-01-16 13:34:00 -080029#include "InputFrame.h"
30#include "PrebuiltGraph.h"
31
32namespace android {
33namespace automotive {
34namespace computepipe {
35namespace runner {
36namespace engine {
37
38using android::automotive::computepipe::graph::PrebuiltGraph;
39using android::automotive::computepipe::runner::client_interface::ClientInterface;
40using android::automotive::computepipe::runner::generator::DefaultEvent;
41using android::automotive::computepipe::runner::input_manager::InputEngineInterface;
42using android::automotive::computepipe::runner::stream_manager::StreamEngineInterface;
43using android::automotive::computepipe::runner::stream_manager::StreamManager;
44
45namespace {
46
47int getStreamIdFromSource(std::string source) {
48 auto pos = source.find(":");
49 return std::stoi(source.substr(pos + 1));
50}
51} // namespace
52
53void DefaultEngine::setClientInterface(std::unique_ptr<ClientInterface>&& client) {
54 mClient = std::move(client);
55}
56
57void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
58 mGraph = std::move(graph);
59 mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
Kathan Shukla691ad062020-03-18 15:33:42 -070060 if (mGraph->GetGraphType() == graph::PrebuiltGraphType::REMOTE ||
61 mGraphDescriptor.input_configs_size() == 0) {
Yogeshwar Nagarajf8c8d182020-04-01 13:09:08 -070062 mIgnoreInputManager = true;
63 }
Hanumant Singhb10788f2020-01-16 13:34:00 -080064}
65
66Status DefaultEngine::setArgs(std::string engine_args) {
67 auto pos = engine_args.find(kNoInputManager);
68 if (pos != std::string::npos) {
69 mIgnoreInputManager = true;
70 }
71 pos = engine_args.find(kDisplayStreamId);
72 if (pos == std::string::npos) {
73 return Status::SUCCESS;
74 }
75 mDisplayStream = std::stoi(engine_args.substr(pos + strlen(kDisplayStreamId)));
76 mConfigBuilder.setDebugDisplayStream(mDisplayStream);
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -080077 mDebugDisplayManager = std::make_unique<debug_display_manager::EvsDisplayManager>();
78 mDebugDisplayManager->setArgs(engine_args);
Hanumant Singhb10788f2020-01-16 13:34:00 -080079 return Status::SUCCESS;
80}
81
82Status DefaultEngine::activate() {
83 mConfigBuilder.reset();
84 mEngineThread = std::make_unique<std::thread>(&DefaultEngine::processCommands, this);
85 return mClient->activate();
86}
87
88Status DefaultEngine::processClientConfigUpdate(const proto::ConfigurationCommand& command) {
89 // TODO check current phase
90 std::lock_guard<std::mutex> lock(mEngineLock);
91 if (mCurrentPhase != kResetPhase) {
92 return Status::ILLEGAL_STATE;
93 }
94 if (command.has_set_input_source()) {
95 mConfigBuilder =
96 mConfigBuilder.updateInputConfigOption(command.set_input_source().source_id());
97 } else if (command.has_set_termination_option()) {
98 mConfigBuilder = mConfigBuilder.updateTerminationOption(
99 command.set_termination_option().termination_option_id());
100 } else if (command.has_set_output_stream()) {
101 mConfigBuilder = mConfigBuilder.updateOutputStreamOption(
102 command.set_output_stream().stream_id(),
103 command.set_output_stream().max_inflight_packets_count());
104 } else if (command.has_set_offload_offload()) {
105 mConfigBuilder =
106 mConfigBuilder.updateOffloadOption(command.set_offload_offload().offload_option_id());
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700107 } else if (command.has_set_profile_options()) {
108 mConfigBuilder =
109 mConfigBuilder.updateProfilingType(command.set_profile_options().profile_type());
Hanumant Singhb10788f2020-01-16 13:34:00 -0800110 } else {
111 return SUCCESS;
112 }
113 return Status::SUCCESS;
114}
115
116Status DefaultEngine::processClientCommand(const proto::ControlCommand& command) {
117 // TODO check current phase
118 std::lock_guard<std::mutex> lock(mEngineLock);
119
120 if (command.has_apply_configs()) {
121 if (mCurrentPhase != kResetPhase) {
122 return Status::ILLEGAL_STATE;
123 }
124 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_CONFIG);
125 return Status::SUCCESS;
126 }
127 if (command.has_start_graph()) {
128 if (mCurrentPhase != kConfigPhase) {
129 return Status::ILLEGAL_STATE;
130 }
131 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_START_RUN);
132 return Status::SUCCESS;
133 }
134 if (command.has_stop_graph()) {
135 if (mCurrentPhase != kRunPhase) {
136 return Status::ILLEGAL_STATE;
137 }
138 mStopFromClient = true;
139 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_INITIATE_STOP);
140 return Status::SUCCESS;
141 }
142 if (command.has_death_notification()) {
Hanumant Singhaaa5a462020-03-02 14:57:06 -0800143 if (mCurrentPhase == kResetPhase) {
144 /**
145 * The runner is already in reset state, no need to broadcast client death
146 * to components
147 */
148 LOG(INFO) << "client death notification with no configuration";
149 return Status::SUCCESS;
150 }
151 mCurrentPhaseError = std::make_unique<ComponentError>("ClientInterface", "Client death",
152 mCurrentPhase, false);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800153 mWakeLooper.notify_all();
154 return Status::SUCCESS;
155 }
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700156 if (command.has_reset_configs()) {
157 if (mCurrentPhase != kConfigPhase) {
158 return Status::ILLEGAL_STATE;
159 }
160 queueCommand("ClientInterface", EngineCommand::Type::RESET_CONFIG);
161 return Status::SUCCESS;
162 }
163 if (command.has_start_pipe_profile()) {
164 if (mCurrentPhase != kRunPhase) {
165 return Status::ILLEGAL_STATE;
166 }
167 if (mGraph) {
168 return mGraph->StartGraphProfiling();
169 }
170 return Status::SUCCESS;
171 }
172 if (command.has_stop_pipe_profile()) {
173 if (mCurrentPhase != kRunPhase) {
174 return Status::SUCCESS;
175 }
176 if (mGraph) {
177 return mGraph->StopGraphProfiling();
178 }
179 return Status::SUCCESS;
180 }
181 if (command.has_release_debugger()) {
182 if (mCurrentPhase != kConfigPhase && mCurrentPhase != kResetPhase) {
183 return Status::ILLEGAL_STATE;
184 }
185 queueCommand("ClientInterface", EngineCommand::Type::RELEASE_DEBUGGER);
186 }
187 if (command.has_read_debug_data()) {
188 queueCommand("ClientInterface", EngineCommand::Type::READ_PROFILING);
189 return Status::SUCCESS;
190 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800191 return Status::SUCCESS;
192}
193
Yogeshwar Nagaraj65a5f902020-01-27 13:51:53 -0800194Status DefaultEngine::freePacket(int bufferId, int streamId) {
195 if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
196 LOG(ERROR)
197 << "Unable to find the stream manager corresponding to the id for freeing the packet.";
198 return Status::INVALID_ARGUMENT;
199 }
200 return mStreamManagers[streamId]->freePacket(bufferId);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800201}
202
203/**
204 * Methods from PrebuiltEngineInterface
205 */
Kathan Shukla12595b82020-02-05 10:27:15 -0800206void DefaultEngine::DispatchPixelData(int streamId, int64_t timestamp, const InputFrame& frame) {
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800207 LOG(DEBUG) << "Engine::Received data for pixel stream " << streamId << " with timestamp "
Kathan Shukla12595b82020-02-05 10:27:15 -0800208 << timestamp;
209 if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
210 LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
Kathan Shukla7a84fb12020-02-07 09:48:48 -0800211 return;
Kathan Shukla12595b82020-02-05 10:27:15 -0800212 }
213 mStreamManagers[streamId]->queuePacket(frame, timestamp);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800214}
215
216void DefaultEngine::DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) {
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700217 LOG(DEBUG) << "Engine::Received data for stream " << streamId << " with timestamp "
218 << timestamp;
Hanumant Singhb10788f2020-01-16 13:34:00 -0800219 if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
220 LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
Kathan Shukla7a84fb12020-02-07 09:48:48 -0800221 return;
Hanumant Singhb10788f2020-01-16 13:34:00 -0800222 }
223 std::string data(output);
224 mStreamManagers[streamId]->queuePacket(data.c_str(), data.size(), timestamp);
225}
226
227void DefaultEngine::DispatchGraphTerminationMessage(Status s, std::string&& msg) {
228 std::lock_guard<std::mutex> lock(mEngineLock);
229 if (s == SUCCESS) {
230 if (mCurrentPhase == kRunPhase) {
231 queueCommand("PrebuiltGraph", EngineCommand::Type::BROADCAST_INITIATE_STOP);
232 } else {
233 LOG(WARNING) << "Graph termination when not in run phase";
234 }
235 } else {
236 std::string error = msg;
237 queueError("PrebuiltGraph", error, false);
238 }
239}
240
241Status DefaultEngine::broadcastClientConfig() {
242 ClientConfig config = mConfigBuilder.emitClientOptions();
243
244 LOG(INFO) << "Engine::create stream manager";
245 Status ret = populateStreamManagers(config);
246 if (ret != Status::SUCCESS) {
247 return ret;
248 }
249
250 if (mGraph) {
251 ret = populateInputManagers(config);
252 if (ret != Status::SUCCESS) {
253 abortClientConfig(config);
254 return ret;
255 }
256
257 LOG(INFO) << "Engine::send client config entry to graph";
258 config.setPhaseState(PhaseState::ENTRY);
259 ret = mGraph->handleConfigPhase(config);
260 if (ret != Status::SUCCESS) {
261 abortClientConfig(config);
262 return ret;
263 }
264 LOG(INFO) << "Engine::send client config transition complete to graph";
265 config.setPhaseState(PhaseState::TRANSITION_COMPLETE);
266 ret = mGraph->handleConfigPhase(config);
267 if (ret != Status::SUCCESS) {
268 abortClientConfig(config);
269 return ret;
270 }
271 }
272 LOG(INFO) << "Engine::Graph configured";
273 // TODO add handling for remote graph
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800274 if (mDebugDisplayManager) {
275 mDebugDisplayManager->setFreePacketCallback(std::bind(
276 &DefaultEngine::freePacket, this, std::placeholders::_1, mDisplayStream));
277
278 ret = mDebugDisplayManager->handleConfigPhase(config);
279 if (ret != Status::SUCCESS) {
280 config.setPhaseState(PhaseState::ABORTED);
281 abortClientConfig(config, true);
282 return ret;
283 }
284 }
285
Hanumant Singhb10788f2020-01-16 13:34:00 -0800286 ret = mClient->handleConfigPhase(config);
287 if (ret != Status::SUCCESS) {
288 config.setPhaseState(PhaseState::ABORTED);
289 abortClientConfig(config, true);
290 return ret;
291 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800292
Hanumant Singhb10788f2020-01-16 13:34:00 -0800293 mCurrentPhase = kConfigPhase;
294 return Status::SUCCESS;
295}
296
297void DefaultEngine::abortClientConfig(const ClientConfig& config, bool resetGraph) {
298 mStreamManagers.clear();
299 mInputManagers.clear();
300 if (resetGraph && mGraph) {
301 (void)mGraph->handleConfigPhase(config);
302 }
303 (void)mClient->handleConfigPhase(config);
304 // TODO add handling for remote graph
305}
306
307Status DefaultEngine::broadcastStartRun() {
308 DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RUN);
309
310 std::vector<int> successfulStreams;
311 std::vector<int> successfulInputs;
312 for (auto& it : mStreamManagers) {
Hanumant Singhb10788f2020-01-16 13:34:00 -0800313 if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
314 LOG(ERROR) << "Engine::failure to enter run phase for stream " << it.first;
315 broadcastAbortRun(successfulStreams, successfulInputs);
316 return Status::INTERNAL_ERROR;
317 }
318 successfulStreams.push_back(it.first);
319 }
320 // TODO: send to remote
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800321 if (mDebugDisplayManager) {
322 (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
323 }
324
Hanumant Singhb10788f2020-01-16 13:34:00 -0800325 Status ret;
326 if (mGraph) {
327 LOG(INFO) << "Engine::sending start run to prebuilt";
328 ret = mGraph->handleExecutionPhase(runEvent);
329 if (ret != Status::SUCCESS) {
330 broadcastAbortRun(successfulStreams, successfulInputs);
331 }
332 for (auto& it : mInputManagers) {
333 if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
334 LOG(ERROR) << "Engine::failure to enter run phase for input manager " << it.first;
335 broadcastAbortRun(successfulStreams, successfulInputs, true);
336 return Status::INTERNAL_ERROR;
337 }
338 successfulInputs.push_back(it.first);
339 }
340 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800341
Hanumant Singhb10788f2020-01-16 13:34:00 -0800342 runEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RUN);
343 LOG(INFO) << "Engine::sending run transition complete to client";
344 ret = mClient->handleExecutionPhase(runEvent);
345 if (ret != Status::SUCCESS) {
346 LOG(ERROR) << "Engine::client failure to acknowledge transition to run complete ";
347 broadcastAbortRun(successfulStreams, successfulInputs, true);
348 return ret;
349 }
350 for (auto& it : mStreamManagers) {
351 (void)it.second->handleExecutionPhase(runEvent);
352 }
353 // TODO: send to remote
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800354 if (mDebugDisplayManager) {
355 (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
356 }
357
Hanumant Singhb10788f2020-01-16 13:34:00 -0800358 if (mGraph) {
359 LOG(INFO) << "Engine::sending run transition complete to prebuilt";
360 (void)mGraph->handleExecutionPhase(runEvent);
361 for (auto& it : mInputManagers) {
362 (void)it.second->handleExecutionPhase(runEvent);
363 }
364 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800365
Hanumant Singhb10788f2020-01-16 13:34:00 -0800366 LOG(INFO) << "Engine::Running";
367 mCurrentPhase = kRunPhase;
368 return Status::SUCCESS;
369}
370
371void DefaultEngine::broadcastAbortRun(const std::vector<int>& streamIds,
372 const std::vector<int>& inputIds, bool abortGraph) {
373 DefaultEvent runEvent = DefaultEvent::generateAbortEvent(DefaultEvent::RUN);
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800374 if (mDebugDisplayManager) {
375 (void)mDebugDisplayManager->handleExecutionPhase(runEvent);
376 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800377 std::for_each(streamIds.begin(), streamIds.end(), [this, runEvent](int id) {
378 (void)this->mStreamManagers[id]->handleExecutionPhase(runEvent);
379 });
380 std::for_each(inputIds.begin(), inputIds.end(), [this, runEvent](int id) {
381 (void)this->mInputManagers[id]->handleExecutionPhase(runEvent);
382 });
383 if (abortGraph) {
384 if (mGraph) {
385 (void)mGraph->handleExecutionPhase(runEvent);
386 }
387 }
Hanumant Singh1642fcc2020-02-17 23:14:32 -0800388 (void)mClient->handleExecutionPhase(runEvent);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800389}
390
391Status DefaultEngine::broadcastStopWithFlush() {
392 DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_WITH_FLUSH);
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800393 if (mDebugDisplayManager) {
394 (void)mDebugDisplayManager->handleStopWithFlushPhase(runEvent);
395 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800396
397 if (mGraph) {
398 for (auto& it : mInputManagers) {
399 (void)it.second->handleStopWithFlushPhase(runEvent);
400 }
401 if (mStopFromClient) {
402 (void)mGraph->handleStopWithFlushPhase(runEvent);
403 }
404 }
405 // TODO: send to remote.
406 for (auto& it : mStreamManagers) {
407 (void)it.second->handleStopWithFlushPhase(runEvent);
408 }
409 if (!mStopFromClient) {
410 (void)mClient->handleStopWithFlushPhase(runEvent);
411 }
412 mCurrentPhase = kStopPhase;
413 return Status::SUCCESS;
414}
415
416Status DefaultEngine::broadcastStopComplete() {
417 DefaultEvent runEvent =
418 DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_WITH_FLUSH);
419 if (mGraph) {
420 for (auto& it : mInputManagers) {
421 (void)it.second->handleStopWithFlushPhase(runEvent);
422 }
423 (void)mGraph->handleStopWithFlushPhase(runEvent);
424 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800425 if (mDebugDisplayManager) {
426 (void)mDebugDisplayManager->handleStopWithFlushPhase(runEvent);
427 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800428 // TODO: send to remote.
429 for (auto& it : mStreamManagers) {
430 (void)it.second->handleStopWithFlushPhase(runEvent);
431 }
432 (void)mClient->handleStopWithFlushPhase(runEvent);
433 mCurrentPhase = kConfigPhase;
434 return Status::SUCCESS;
435}
436
437void DefaultEngine::broadcastHalt() {
438 DefaultEvent stopEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_IMMEDIATE);
439
440 if (mGraph) {
441 for (auto& it : mInputManagers) {
442 (void)it.second->handleStopImmediatePhase(stopEvent);
443 }
444
445 if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos)) {
446 (void)mGraph->handleStopImmediatePhase(stopEvent);
447 }
448 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800449 if (mDebugDisplayManager) {
450 (void)mDebugDisplayManager->handleStopImmediatePhase(stopEvent);
451 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800452 // TODO: send to remote if client was source.
453 for (auto& it : mStreamManagers) {
454 (void)it.second->handleStopImmediatePhase(stopEvent);
455 }
456 if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
457 (void)mClient->handleStopImmediatePhase(stopEvent);
458 }
459
460 stopEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_IMMEDIATE);
461 if (mGraph) {
462 for (auto& it : mInputManagers) {
463 (void)it.second->handleStopImmediatePhase(stopEvent);
464 }
465 // TODO: send to graph or remote if client was source.
466
467 if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos) && mGraph) {
468 (void)mGraph->handleStopImmediatePhase(stopEvent);
469 }
470 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800471 if (mDebugDisplayManager) {
472 (void)mDebugDisplayManager->handleStopImmediatePhase(stopEvent);
473 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800474 for (auto& it : mStreamManagers) {
475 (void)it.second->handleStopImmediatePhase(stopEvent);
476 }
477 if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
478 (void)mClient->handleStopImmediatePhase(stopEvent);
479 }
480 mCurrentPhase = kConfigPhase;
481}
482
483void DefaultEngine::broadcastReset() {
484 mStreamManagers.clear();
485 mInputManagers.clear();
486 DefaultEvent resetEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RESET);
487 (void)mClient->handleResetPhase(resetEvent);
488 if (mGraph) {
489 (void)mGraph->handleResetPhase(resetEvent);
490 }
491 resetEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RESET);
492 (void)mClient->handleResetPhase(resetEvent);
493 if (mGraph) {
494 (void)mGraph->handleResetPhase(resetEvent);
495 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800496 if (mDebugDisplayManager) {
497 (void)mDebugDisplayManager->handleResetPhase(resetEvent);
498 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800499 // TODO: send to remote runner
500 mConfigBuilder.reset();
501 mCurrentPhase = kResetPhase;
502 mStopFromClient = false;
503}
504
505Status DefaultEngine::populateStreamManagers(const ClientConfig& config) {
506 std::map<int, int> outputConfigs;
507 if (config.getOutputStreamConfigs(outputConfigs) != Status::SUCCESS) {
508 return Status::ILLEGAL_STATE;
509 }
510 for (auto& configIt : outputConfigs) {
511 int streamId = configIt.first;
512 int maxInFlightPackets = configIt.second;
513 proto::OutputConfig outputDescriptor;
514 // find the output descriptor for requested stream id
515 bool foundDesc = false;
516 for (auto& optionIt : mGraphDescriptor.output_configs()) {
517 if (optionIt.stream_id() == streamId) {
518 outputDescriptor = optionIt;
519 foundDesc = true;
520 break;
521 }
522 }
523 if (!foundDesc) {
524 LOG(ERROR) << "no matching output config for requested id " << streamId;
525 return Status::INVALID_ARGUMENT;
526 }
527 std::function<Status(std::shared_ptr<MemHandle>)> packetCb =
528 [this, streamId](std::shared_ptr<MemHandle> handle) -> Status {
529 return this->forwardOutputDataToClient(streamId, handle);
530 };
531
532 std::function<void(std::string)> errorCb = [this, streamId](std::string m) {
533 std::string source = "StreamManager:" + std::to_string(streamId) + " : " + m;
534 this->queueError(source, m, false);
535 };
536
537 std::function<void()> eos = [this, streamId]() {
538 std::string source = "StreamManager:" + std::to_string(streamId);
539 std::lock_guard<std::mutex> lock(this->mEngineLock);
540 this->queueCommand(source, EngineCommand::Type::POLL_COMPLETE);
541 };
542
543 std::shared_ptr<StreamEngineInterface> engine = std::make_shared<StreamCallback>(
544 std::move(eos), std::move(errorCb), std::move(packetCb));
545 mStreamManagers.emplace(configIt.first, mStreamFactory.getStreamManager(
546 outputDescriptor, engine, maxInFlightPackets));
547 if (mStreamManagers[streamId] == nullptr) {
548 LOG(ERROR) << "unable to create stream manager for stream " << streamId;
549 return Status::INTERNAL_ERROR;
550 }
551 }
552 return Status::SUCCESS;
553}
554
555Status DefaultEngine::forwardOutputDataToClient(int streamId,
556 std::shared_ptr<MemHandle>& dataHandle) {
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800557 if (streamId != mDisplayStream) {
558 return mClient->dispatchPacketToClient(streamId, dataHandle);
559 }
560
561 auto displayMgrPacket = dataHandle;
562 if (mConfigBuilder.clientConfigEnablesDisplayStream()) {
563 if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
564 displayMgrPacket = nullptr;
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700565 } else {
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800566 displayMgrPacket = mStreamManagers[streamId]->clonePacket(dataHandle);
567 }
568 Status status = mClient->dispatchPacketToClient(streamId, dataHandle);
569 if (status != Status::SUCCESS) {
570 return status;
Hanumant Singhb10788f2020-01-16 13:34:00 -0800571 }
572 }
Kathan Shukla4d5e9ad2020-03-04 18:41:08 -0800573 CHECK(mDebugDisplayManager);
574 return mDebugDisplayManager->displayFrame(dataHandle);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800575}
576
577Status DefaultEngine::populateInputManagers(const ClientConfig& config) {
578 if (mIgnoreInputManager) {
579 return Status::SUCCESS;
580 }
Kathan Shukla691ad062020-03-18 15:33:42 -0700581
Hanumant Singhb10788f2020-01-16 13:34:00 -0800582 proto::InputConfig inputDescriptor;
583 int selectedId;
584
585 if (config.getInputConfigId(&selectedId) != Status::SUCCESS) {
586 return Status::INVALID_ARGUMENT;
587 }
588
589 for (auto& inputIt : mGraphDescriptor.input_configs()) {
590 if (selectedId == inputIt.config_id()) {
591 inputDescriptor = inputIt;
592 std::shared_ptr<InputCallback> cb = std::make_shared<InputCallback>(
593 selectedId,
594 [this](int id) {
595 std::string source = "InputManager:" + std::to_string(id);
596 this->queueError(source, "", false);
597 },
Yogeshwar Nagaraj1af89772020-01-20 11:44:18 -0800598 [this](int streamId, int64_t timestamp, const InputFrame& frame) {
599 return this->mGraph->SetInputStreamPixelData(streamId, timestamp, frame);
600 });
Hanumant Singhb10788f2020-01-16 13:34:00 -0800601 mInputManagers.emplace(selectedId,
602 mInputFactory.createInputManager(inputDescriptor, cb));
603 if (mInputManagers[selectedId] == nullptr) {
604 LOG(ERROR) << "unable to create input manager for stream " << selectedId;
605 // TODO: Add print
606 return Status::INTERNAL_ERROR;
607 }
608 return Status::SUCCESS;
609 }
610 }
611 return Status::INVALID_ARGUMENT;
612}
613
614/**
615 * Engine Command Queue and Error Queue handling
616 */
617void DefaultEngine::processCommands() {
618 std::unique_lock<std::mutex> lock(mEngineLock);
619 while (1) {
620 LOG(INFO) << "Engine::Waiting on commands ";
621 mWakeLooper.wait(lock, [this] {
622 if (this->mCommandQueue.empty() && !mCurrentPhaseError) {
623 return false;
624 } else {
625 return true;
626 }
627 });
628 if (mCurrentPhaseError) {
629 mErrorQueue.push(*mCurrentPhaseError);
630
631 processComponentError(mCurrentPhaseError->source);
632 mCurrentPhaseError = nullptr;
633 std::queue<EngineCommand> empty;
634 std::swap(mCommandQueue, empty);
635 continue;
636 }
637 EngineCommand ec = mCommandQueue.front();
638 mCommandQueue.pop();
639 switch (ec.cmdType) {
640 case EngineCommand::Type::BROADCAST_CONFIG:
Kathan Shukla3db91902020-04-15 13:11:24 -0700641 LOG(INFO) << "Engine::Received broadcast config request";
Hanumant Singhb10788f2020-01-16 13:34:00 -0800642 (void)broadcastClientConfig();
643 break;
644 case EngineCommand::Type::BROADCAST_START_RUN:
Kathan Shukla3db91902020-04-15 13:11:24 -0700645 LOG(INFO) << "Engine::Received broadcast run request";
Hanumant Singhb10788f2020-01-16 13:34:00 -0800646 (void)broadcastStartRun();
647 break;
648 case EngineCommand::Type::BROADCAST_INITIATE_STOP:
649 if (ec.source.find("ClientInterface") != std::string::npos) {
650 mStopFromClient = true;
651 }
Kathan Shukla3db91902020-04-15 13:11:24 -0700652 LOG(INFO) << "Engine::Received broadcast stop with flush request";
Hanumant Singhb10788f2020-01-16 13:34:00 -0800653 broadcastStopWithFlush();
654 break;
655 case EngineCommand::Type::POLL_COMPLETE:
656 LOG(INFO) << "Engine::Received Poll stream managers for completion request";
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700657 {
658 int id = getStreamIdFromSource(ec.source);
659 bool all_done = true;
660 for (auto& it : mStreamManagers) {
661 if (it.first == id) {
662 continue;
663 }
664 if (it.second->getState() != StreamManager::State::STOPPED) {
665 all_done = false;
666 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800667 }
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700668 if (all_done) {
669 broadcastStopComplete();
Hanumant Singhb10788f2020-01-16 13:34:00 -0800670 }
671 }
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700672 break;
673 case EngineCommand::Type::RESET_CONFIG:
674 (void)broadcastReset();
675 break;
676 case EngineCommand::Type::RELEASE_DEBUGGER:
677 {
678 // broadcastReset() resets the previous copy, so save a copy of the old config.
679 ConfigBuilder previousConfig = mConfigBuilder;
680 (void)broadcastReset();
681 mConfigBuilder =
682 previousConfig.updateProfilingType(proto::ProfilingType::DISABLED);
683 (void)broadcastClientConfig();
684 }
685 break;
686 case EngineCommand::Type::READ_PROFILING:
687 std::string debugData;
Kathan Shukla3db91902020-04-15 13:11:24 -0700688 if (mGraph && (mCurrentPhase == kConfigPhase || mCurrentPhase == kRunPhase
689 || mCurrentPhase == kStopPhase)) {
Kathan Shuklabf7a01e2020-03-20 07:15:47 -0700690 debugData = mGraph->GetDebugInfo();
691 }
692 if (mClient) {
693 Status status = mClient->deliverGraphDebugInfo(debugData);
694 if (status != Status::SUCCESS) {
695 LOG(ERROR) << "Failed to deliver graph debug info to client.";
696 }
Hanumant Singhb10788f2020-01-16 13:34:00 -0800697 }
698 break;
699 }
700 }
701}
702
703void DefaultEngine::processComponentError(std::string source) {
704 if (mCurrentPhase == kRunPhase || mCurrentPhase == kStopPhase) {
705 (void)broadcastHalt();
706 }
707 if (source.find("ClientInterface") != std::string::npos) {
708 (void)broadcastReset();
709 }
710}
711
712void DefaultEngine::queueCommand(std::string source, EngineCommand::Type type) {
713 mCommandQueue.push(EngineCommand(source, type));
714 mWakeLooper.notify_all();
715}
716
717void DefaultEngine::queueError(std::string source, std::string msg, bool fatal) {
718 std::lock_guard<std::mutex> lock(mEngineLock);
719 // current phase already has an error report
720 if (!mCurrentPhaseError) {
721 mCurrentPhaseError = std::make_unique<ComponentError>(source, msg, mCurrentPhase, fatal);
722 mWakeLooper.notify_all();
723 }
724}
725
726/**
727 * InputCallback implementation
728 */
Yogeshwar Nagaraj1af89772020-01-20 11:44:18 -0800729InputCallback::InputCallback(
730 int id, const std::function<void(int)>&& cb,
731 const std::function<Status(int, int64_t timestamp, const InputFrame&)>&& packetCb)
Hanumant Singhb10788f2020-01-16 13:34:00 -0800732 : mErrorCallback(cb), mPacketHandler(packetCb), mInputId(id) {
733}
734
Yogeshwar Nagaraj1af89772020-01-20 11:44:18 -0800735Status InputCallback::dispatchInputFrame(int streamId, int64_t timestamp, const InputFrame& frame) {
736 return mPacketHandler(streamId, timestamp, frame);
Hanumant Singhb10788f2020-01-16 13:34:00 -0800737}
Yogeshwar Nagaraj1af89772020-01-20 11:44:18 -0800738
Hanumant Singhb10788f2020-01-16 13:34:00 -0800739void InputCallback::notifyInputError() {
740 mErrorCallback(mInputId);
741}
742
743/**
744 * StreamCallback implementation
745 */
746StreamCallback::StreamCallback(
747 const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
748 const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler)
749 : mErrorHandler(errorCb), mEndOfStreamHandler(eos), mPacketHandler(packetHandler) {
750}
751
752void StreamCallback::notifyError(std::string msg) {
753 mErrorHandler(msg);
754}
755
756void StreamCallback::notifyEndOfStream() {
757 mEndOfStreamHandler();
758}
759
760Status StreamCallback::dispatchPacket(const std::shared_ptr<MemHandle>& packet) {
761 return mPacketHandler(packet);
762}
763
764} // namespace engine
765} // namespace runner
766} // namespace computepipe
767} // namespace automotive
768} // namespace android