blob: 92eabf2d1821f62e51f8c0f0debe5837cefd6ef3 [file] [log] [blame]
Hanumant Singhb10788f2020-01-16 13:34:00 -08001// Copyright (C) 2020 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "DefaultEngine.h"
16
17#include <android-base/logging.h>
18
19#include <algorithm>
20#include <cassert>
21#include <memory>
22#include <mutex>
23#include <thread>
24#include <vector>
25
26#include "ClientInterface.h"
27#include "EventGenerator.h"
28#include "InputFrame.h"
29#include "PrebuiltGraph.h"
30
31namespace android {
32namespace automotive {
33namespace computepipe {
34namespace runner {
35namespace engine {
36
37using android::automotive::computepipe::graph::PrebuiltGraph;
38using android::automotive::computepipe::runner::client_interface::ClientInterface;
39using android::automotive::computepipe::runner::generator::DefaultEvent;
40using android::automotive::computepipe::runner::input_manager::InputEngineInterface;
41using android::automotive::computepipe::runner::stream_manager::StreamEngineInterface;
42using android::automotive::computepipe::runner::stream_manager::StreamManager;
43
44namespace {
45
46int getStreamIdFromSource(std::string source) {
47 auto pos = source.find(":");
48 return std::stoi(source.substr(pos + 1));
49}
50} // namespace
51
52void DefaultEngine::setClientInterface(std::unique_ptr<ClientInterface>&& client) {
53 mClient = std::move(client);
54}
55
56void DefaultEngine::setPrebuiltGraph(std::unique_ptr<PrebuiltGraph>&& graph) {
57 mGraph = std::move(graph);
58 mGraphDescriptor = mGraph->GetSupportedGraphConfigs();
59}
60
61Status DefaultEngine::setArgs(std::string engine_args) {
62 auto pos = engine_args.find(kNoInputManager);
63 if (pos != std::string::npos) {
64 mIgnoreInputManager = true;
65 }
66 pos = engine_args.find(kDisplayStreamId);
67 if (pos == std::string::npos) {
68 return Status::SUCCESS;
69 }
70 mDisplayStream = std::stoi(engine_args.substr(pos + strlen(kDisplayStreamId)));
71 mConfigBuilder.setDebugDisplayStream(mDisplayStream);
72 return Status::SUCCESS;
73}
74
75Status DefaultEngine::activate() {
76 mConfigBuilder.reset();
77 mEngineThread = std::make_unique<std::thread>(&DefaultEngine::processCommands, this);
78 return mClient->activate();
79}
80
81Status DefaultEngine::processClientConfigUpdate(const proto::ConfigurationCommand& command) {
82 // TODO check current phase
83 std::lock_guard<std::mutex> lock(mEngineLock);
84 if (mCurrentPhase != kResetPhase) {
85 return Status::ILLEGAL_STATE;
86 }
87 if (command.has_set_input_source()) {
88 mConfigBuilder =
89 mConfigBuilder.updateInputConfigOption(command.set_input_source().source_id());
90 } else if (command.has_set_termination_option()) {
91 mConfigBuilder = mConfigBuilder.updateTerminationOption(
92 command.set_termination_option().termination_option_id());
93 } else if (command.has_set_output_stream()) {
94 mConfigBuilder = mConfigBuilder.updateOutputStreamOption(
95 command.set_output_stream().stream_id(),
96 command.set_output_stream().max_inflight_packets_count());
97 } else if (command.has_set_offload_offload()) {
98 mConfigBuilder =
99 mConfigBuilder.updateOffloadOption(command.set_offload_offload().offload_option_id());
100 } else {
101 return SUCCESS;
102 }
103 return Status::SUCCESS;
104}
105
106Status DefaultEngine::processClientCommand(const proto::ControlCommand& command) {
107 // TODO check current phase
108 std::lock_guard<std::mutex> lock(mEngineLock);
109
110 if (command.has_apply_configs()) {
111 if (mCurrentPhase != kResetPhase) {
112 return Status::ILLEGAL_STATE;
113 }
114 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_CONFIG);
115 return Status::SUCCESS;
116 }
117 if (command.has_start_graph()) {
118 if (mCurrentPhase != kConfigPhase) {
119 return Status::ILLEGAL_STATE;
120 }
121 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_START_RUN);
122 return Status::SUCCESS;
123 }
124 if (command.has_stop_graph()) {
125 if (mCurrentPhase != kRunPhase) {
126 return Status::ILLEGAL_STATE;
127 }
128 mStopFromClient = true;
129 queueCommand("ClientInterface", EngineCommand::Type::BROADCAST_INITIATE_STOP);
130 return Status::SUCCESS;
131 }
132 if (command.has_death_notification()) {
133 mErrorQueue.push(ComponentError("ClientInterface", "Client death", mCurrentPhase, false));
134 mWakeLooper.notify_all();
135 return Status::SUCCESS;
136 }
137 return Status::SUCCESS;
138}
139
140Status DefaultEngine::freePacket(const std::shared_ptr<MemHandle>& packet) {
141 int streamId = packet->getStreamId();
142 mStreamManagers[streamId]->freePacket(packet);
143 return Status::SUCCESS;
144}
145
146/**
147 * Methods from PrebuiltEngineInterface
148 */
149void DefaultEngine::DispatchPixelData(int /* streamId */, int64_t /* timestamp */,
150 const uint8_t* /* pixels */, int /* width */,
151 int /* height */, int /* step */, PixelFormat /* format*/) {
152 // TODO: b/147975150 Add pixel stream forwarding to stream manager.
153 return;
154}
155
156void DefaultEngine::DispatchSerializedData(int streamId, int64_t timestamp, std::string&& output) {
157 LOG(INFO) << "Engine::Received data for stream " << streamId << " with timestamp " << timestamp;
158 if (mStreamManagers.find(streamId) == mStreamManagers.end()) {
159 LOG(ERROR) << "Engine::Received bad stream id from prebuilt graph";
160 }
161 std::string data(output);
162 mStreamManagers[streamId]->queuePacket(data.c_str(), data.size(), timestamp);
163}
164
165void DefaultEngine::DispatchGraphTerminationMessage(Status s, std::string&& msg) {
166 std::lock_guard<std::mutex> lock(mEngineLock);
167 if (s == SUCCESS) {
168 if (mCurrentPhase == kRunPhase) {
169 queueCommand("PrebuiltGraph", EngineCommand::Type::BROADCAST_INITIATE_STOP);
170 } else {
171 LOG(WARNING) << "Graph termination when not in run phase";
172 }
173 } else {
174 std::string error = msg;
175 queueError("PrebuiltGraph", error, false);
176 }
177}
178
179Status DefaultEngine::broadcastClientConfig() {
180 ClientConfig config = mConfigBuilder.emitClientOptions();
181
182 LOG(INFO) << "Engine::create stream manager";
183 Status ret = populateStreamManagers(config);
184 if (ret != Status::SUCCESS) {
185 return ret;
186 }
187
188 if (mGraph) {
189 ret = populateInputManagers(config);
190 if (ret != Status::SUCCESS) {
191 abortClientConfig(config);
192 return ret;
193 }
194
195 LOG(INFO) << "Engine::send client config entry to graph";
196 config.setPhaseState(PhaseState::ENTRY);
197 ret = mGraph->handleConfigPhase(config);
198 if (ret != Status::SUCCESS) {
199 abortClientConfig(config);
200 return ret;
201 }
202 LOG(INFO) << "Engine::send client config transition complete to graph";
203 config.setPhaseState(PhaseState::TRANSITION_COMPLETE);
204 ret = mGraph->handleConfigPhase(config);
205 if (ret != Status::SUCCESS) {
206 abortClientConfig(config);
207 return ret;
208 }
209 }
210 LOG(INFO) << "Engine::Graph configured";
211 // TODO add handling for remote graph
212 ret = mClient->handleConfigPhase(config);
213 if (ret != Status::SUCCESS) {
214 config.setPhaseState(PhaseState::ABORTED);
215 abortClientConfig(config, true);
216 return ret;
217 }
218 mCurrentPhase = kConfigPhase;
219 return Status::SUCCESS;
220}
221
222void DefaultEngine::abortClientConfig(const ClientConfig& config, bool resetGraph) {
223 mStreamManagers.clear();
224 mInputManagers.clear();
225 if (resetGraph && mGraph) {
226 (void)mGraph->handleConfigPhase(config);
227 }
228 (void)mClient->handleConfigPhase(config);
229 // TODO add handling for remote graph
230}
231
232Status DefaultEngine::broadcastStartRun() {
233 DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RUN);
234
235 std::vector<int> successfulStreams;
236 std::vector<int> successfulInputs;
237 for (auto& it : mStreamManagers) {
238 LOG(INFO) << "Engine::sending start run to stream manager " << it.first << " failed";
239 if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
240 LOG(ERROR) << "Engine::failure to enter run phase for stream " << it.first;
241 broadcastAbortRun(successfulStreams, successfulInputs);
242 return Status::INTERNAL_ERROR;
243 }
244 successfulStreams.push_back(it.first);
245 }
246 // TODO: send to remote
247 Status ret;
248 if (mGraph) {
249 LOG(INFO) << "Engine::sending start run to prebuilt";
250 ret = mGraph->handleExecutionPhase(runEvent);
251 if (ret != Status::SUCCESS) {
252 broadcastAbortRun(successfulStreams, successfulInputs);
253 }
254 for (auto& it : mInputManagers) {
255 if (it.second->handleExecutionPhase(runEvent) != Status::SUCCESS) {
256 LOG(ERROR) << "Engine::failure to enter run phase for input manager " << it.first;
257 broadcastAbortRun(successfulStreams, successfulInputs, true);
258 return Status::INTERNAL_ERROR;
259 }
260 successfulInputs.push_back(it.first);
261 }
262 }
263 runEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RUN);
264 LOG(INFO) << "Engine::sending run transition complete to client";
265 ret = mClient->handleExecutionPhase(runEvent);
266 if (ret != Status::SUCCESS) {
267 LOG(ERROR) << "Engine::client failure to acknowledge transition to run complete ";
268 broadcastAbortRun(successfulStreams, successfulInputs, true);
269 return ret;
270 }
271 for (auto& it : mStreamManagers) {
272 (void)it.second->handleExecutionPhase(runEvent);
273 }
274 // TODO: send to remote
275 if (mGraph) {
276 LOG(INFO) << "Engine::sending run transition complete to prebuilt";
277 (void)mGraph->handleExecutionPhase(runEvent);
278 for (auto& it : mInputManagers) {
279 (void)it.second->handleExecutionPhase(runEvent);
280 }
281 }
282 LOG(INFO) << "Engine::Running";
283 mCurrentPhase = kRunPhase;
284 return Status::SUCCESS;
285}
286
287void DefaultEngine::broadcastAbortRun(const std::vector<int>& streamIds,
288 const std::vector<int>& inputIds, bool abortGraph) {
289 DefaultEvent runEvent = DefaultEvent::generateAbortEvent(DefaultEvent::RUN);
290 std::for_each(streamIds.begin(), streamIds.end(), [this, runEvent](int id) {
291 (void)this->mStreamManagers[id]->handleExecutionPhase(runEvent);
292 });
293 std::for_each(inputIds.begin(), inputIds.end(), [this, runEvent](int id) {
294 (void)this->mInputManagers[id]->handleExecutionPhase(runEvent);
295 });
296 if (abortGraph) {
297 if (mGraph) {
298 (void)mGraph->handleExecutionPhase(runEvent);
299 }
300 }
301}
302
303Status DefaultEngine::broadcastStopWithFlush() {
304 DefaultEvent runEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_WITH_FLUSH);
305
306 if (mGraph) {
307 for (auto& it : mInputManagers) {
308 (void)it.second->handleStopWithFlushPhase(runEvent);
309 }
310 if (mStopFromClient) {
311 (void)mGraph->handleStopWithFlushPhase(runEvent);
312 }
313 }
314 // TODO: send to remote.
315 for (auto& it : mStreamManagers) {
316 (void)it.second->handleStopWithFlushPhase(runEvent);
317 }
318 if (!mStopFromClient) {
319 (void)mClient->handleStopWithFlushPhase(runEvent);
320 }
321 mCurrentPhase = kStopPhase;
322 return Status::SUCCESS;
323}
324
325Status DefaultEngine::broadcastStopComplete() {
326 DefaultEvent runEvent =
327 DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_WITH_FLUSH);
328 if (mGraph) {
329 for (auto& it : mInputManagers) {
330 (void)it.second->handleStopWithFlushPhase(runEvent);
331 }
332 (void)mGraph->handleStopWithFlushPhase(runEvent);
333 }
334 // TODO: send to remote.
335 for (auto& it : mStreamManagers) {
336 (void)it.second->handleStopWithFlushPhase(runEvent);
337 }
338 (void)mClient->handleStopWithFlushPhase(runEvent);
339 mCurrentPhase = kConfigPhase;
340 return Status::SUCCESS;
341}
342
343void DefaultEngine::broadcastHalt() {
344 DefaultEvent stopEvent = DefaultEvent::generateEntryEvent(DefaultEvent::STOP_IMMEDIATE);
345
346 if (mGraph) {
347 for (auto& it : mInputManagers) {
348 (void)it.second->handleStopImmediatePhase(stopEvent);
349 }
350
351 if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos)) {
352 (void)mGraph->handleStopImmediatePhase(stopEvent);
353 }
354 }
355 // TODO: send to remote if client was source.
356 for (auto& it : mStreamManagers) {
357 (void)it.second->handleStopImmediatePhase(stopEvent);
358 }
359 if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
360 (void)mClient->handleStopImmediatePhase(stopEvent);
361 }
362
363 stopEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::STOP_IMMEDIATE);
364 if (mGraph) {
365 for (auto& it : mInputManagers) {
366 (void)it.second->handleStopImmediatePhase(stopEvent);
367 }
368 // TODO: send to graph or remote if client was source.
369
370 if ((mCurrentPhaseError->source.find("PrebuiltGraph") == std::string::npos) && mGraph) {
371 (void)mGraph->handleStopImmediatePhase(stopEvent);
372 }
373 }
374 for (auto& it : mStreamManagers) {
375 (void)it.second->handleStopImmediatePhase(stopEvent);
376 }
377 if (mCurrentPhaseError->source.find("ClientInterface") == std::string::npos) {
378 (void)mClient->handleStopImmediatePhase(stopEvent);
379 }
380 mCurrentPhase = kConfigPhase;
381}
382
383void DefaultEngine::broadcastReset() {
384 mStreamManagers.clear();
385 mInputManagers.clear();
386 DefaultEvent resetEvent = DefaultEvent::generateEntryEvent(DefaultEvent::RESET);
387 (void)mClient->handleResetPhase(resetEvent);
388 if (mGraph) {
389 (void)mGraph->handleResetPhase(resetEvent);
390 }
391 resetEvent = DefaultEvent::generateTransitionCompleteEvent(DefaultEvent::RESET);
392 (void)mClient->handleResetPhase(resetEvent);
393 if (mGraph) {
394 (void)mGraph->handleResetPhase(resetEvent);
395 }
396 // TODO: send to remote runner
397 mConfigBuilder.reset();
398 mCurrentPhase = kResetPhase;
399 mStopFromClient = false;
400}
401
402Status DefaultEngine::populateStreamManagers(const ClientConfig& config) {
403 std::map<int, int> outputConfigs;
404 if (config.getOutputStreamConfigs(outputConfigs) != Status::SUCCESS) {
405 return Status::ILLEGAL_STATE;
406 }
407 for (auto& configIt : outputConfigs) {
408 int streamId = configIt.first;
409 int maxInFlightPackets = configIt.second;
410 proto::OutputConfig outputDescriptor;
411 // find the output descriptor for requested stream id
412 bool foundDesc = false;
413 for (auto& optionIt : mGraphDescriptor.output_configs()) {
414 if (optionIt.stream_id() == streamId) {
415 outputDescriptor = optionIt;
416 foundDesc = true;
417 break;
418 }
419 }
420 if (!foundDesc) {
421 LOG(ERROR) << "no matching output config for requested id " << streamId;
422 return Status::INVALID_ARGUMENT;
423 }
424 std::function<Status(std::shared_ptr<MemHandle>)> packetCb =
425 [this, streamId](std::shared_ptr<MemHandle> handle) -> Status {
426 return this->forwardOutputDataToClient(streamId, handle);
427 };
428
429 std::function<void(std::string)> errorCb = [this, streamId](std::string m) {
430 std::string source = "StreamManager:" + std::to_string(streamId) + " : " + m;
431 this->queueError(source, m, false);
432 };
433
434 std::function<void()> eos = [this, streamId]() {
435 std::string source = "StreamManager:" + std::to_string(streamId);
436 std::lock_guard<std::mutex> lock(this->mEngineLock);
437 this->queueCommand(source, EngineCommand::Type::POLL_COMPLETE);
438 };
439
440 std::shared_ptr<StreamEngineInterface> engine = std::make_shared<StreamCallback>(
441 std::move(eos), std::move(errorCb), std::move(packetCb));
442 mStreamManagers.emplace(configIt.first, mStreamFactory.getStreamManager(
443 outputDescriptor, engine, maxInFlightPackets));
444 if (mStreamManagers[streamId] == nullptr) {
445 LOG(ERROR) << "unable to create stream manager for stream " << streamId;
446 return Status::INTERNAL_ERROR;
447 }
448 }
449 return Status::SUCCESS;
450}
451
452Status DefaultEngine::forwardOutputDataToClient(int streamId,
453 std::shared_ptr<MemHandle>& dataHandle) {
454 if (streamId == mDisplayStream) {
455 // TODO: dispatch to display
456 if (mConfigBuilder.clientConfigEnablesDisplayStream()) {
457 return mClient->dispatchPacketToClient(streamId, dataHandle);
458 }
459 }
460 return mClient->dispatchPacketToClient(streamId, dataHandle);
461}
462
463Status DefaultEngine::populateInputManagers(const ClientConfig& config) {
464 if (mIgnoreInputManager) {
465 return Status::SUCCESS;
466 }
467 proto::InputConfig inputDescriptor;
468 int selectedId;
469
470 if (config.getInputConfigId(&selectedId) != Status::SUCCESS) {
471 return Status::INVALID_ARGUMENT;
472 }
473
474 for (auto& inputIt : mGraphDescriptor.input_configs()) {
475 if (selectedId == inputIt.config_id()) {
476 inputDescriptor = inputIt;
477 std::shared_ptr<InputCallback> cb = std::make_shared<InputCallback>(
478 selectedId,
479 [this](int id) {
480 std::string source = "InputManager:" + std::to_string(id);
481 this->queueError(source, "", false);
482 },
483 [](int /*streamId */, const InputFrame& /* frame */) { return Status::SUCCESS; });
484 mInputManagers.emplace(selectedId,
485 mInputFactory.createInputManager(inputDescriptor, cb));
486 if (mInputManagers[selectedId] == nullptr) {
487 LOG(ERROR) << "unable to create input manager for stream " << selectedId;
488 // TODO: Add print
489 return Status::INTERNAL_ERROR;
490 }
491 return Status::SUCCESS;
492 }
493 }
494 return Status::INVALID_ARGUMENT;
495}
496
497/**
498 * Engine Command Queue and Error Queue handling
499 */
500void DefaultEngine::processCommands() {
501 std::unique_lock<std::mutex> lock(mEngineLock);
502 while (1) {
503 LOG(INFO) << "Engine::Waiting on commands ";
504 mWakeLooper.wait(lock, [this] {
505 if (this->mCommandQueue.empty() && !mCurrentPhaseError) {
506 return false;
507 } else {
508 return true;
509 }
510 });
511 if (mCurrentPhaseError) {
512 mErrorQueue.push(*mCurrentPhaseError);
513
514 processComponentError(mCurrentPhaseError->source);
515 mCurrentPhaseError = nullptr;
516 std::queue<EngineCommand> empty;
517 std::swap(mCommandQueue, empty);
518 continue;
519 }
520 EngineCommand ec = mCommandQueue.front();
521 mCommandQueue.pop();
522 switch (ec.cmdType) {
523 case EngineCommand::Type::BROADCAST_CONFIG:
524 LOG(INFO) << "Engine::Received broacast config request";
525 (void)broadcastClientConfig();
526 break;
527 case EngineCommand::Type::BROADCAST_START_RUN:
528 LOG(INFO) << "Engine::Received broacast run request";
529 (void)broadcastStartRun();
530 break;
531 case EngineCommand::Type::BROADCAST_INITIATE_STOP:
532 if (ec.source.find("ClientInterface") != std::string::npos) {
533 mStopFromClient = true;
534 }
535 LOG(INFO) << "Engine::Received broacast stop with flush request";
536 broadcastStopWithFlush();
537 break;
538 case EngineCommand::Type::POLL_COMPLETE:
539 LOG(INFO) << "Engine::Received Poll stream managers for completion request";
540 int id = getStreamIdFromSource(ec.source);
541 bool all_done = true;
542 for (auto& it : mStreamManagers) {
543 if (it.first == id) {
544 continue;
545 }
546 if (it.second->getState() != StreamManager::State::STOPPED) {
547 all_done = false;
548 }
549 }
550 if (all_done) {
551 broadcastStopComplete();
552 }
553 break;
554 }
555 }
556}
557
558void DefaultEngine::processComponentError(std::string source) {
559 if (mCurrentPhase == kRunPhase || mCurrentPhase == kStopPhase) {
560 (void)broadcastHalt();
561 }
562 if (source.find("ClientInterface") != std::string::npos) {
563 (void)broadcastReset();
564 }
565}
566
567void DefaultEngine::queueCommand(std::string source, EngineCommand::Type type) {
568 mCommandQueue.push(EngineCommand(source, type));
569 mWakeLooper.notify_all();
570}
571
572void DefaultEngine::queueError(std::string source, std::string msg, bool fatal) {
573 std::lock_guard<std::mutex> lock(mEngineLock);
574 // current phase already has an error report
575 if (!mCurrentPhaseError) {
576 mCurrentPhaseError = std::make_unique<ComponentError>(source, msg, mCurrentPhase, fatal);
577 mWakeLooper.notify_all();
578 }
579}
580
581/**
582 * InputCallback implementation
583 */
584InputCallback::InputCallback(int id, const std::function<void(int)>&& cb,
585 const std::function<Status(int, const InputFrame&)>&& packetCb)
586 : mErrorCallback(cb), mPacketHandler(packetCb), mInputId(id) {
587}
588
589Status InputCallback::dispatchInputFrame(int streamId, const InputFrame& frame) {
590 return mPacketHandler(streamId, frame);
591}
592void InputCallback::notifyInputError() {
593 mErrorCallback(mInputId);
594}
595
596/**
597 * StreamCallback implementation
598 */
599StreamCallback::StreamCallback(
600 const std::function<void()>&& eos, const std::function<void(std::string)>&& errorCb,
601 const std::function<Status(const std::shared_ptr<MemHandle>&)>&& packetHandler)
602 : mErrorHandler(errorCb), mEndOfStreamHandler(eos), mPacketHandler(packetHandler) {
603}
604
605void StreamCallback::notifyError(std::string msg) {
606 mErrorHandler(msg);
607}
608
609void StreamCallback::notifyEndOfStream() {
610 mEndOfStreamHandler();
611}
612
613Status StreamCallback::dispatchPacket(const std::shared_ptr<MemHandle>& packet) {
614 return mPacketHandler(packet);
615}
616
617} // namespace engine
618} // namespace runner
619} // namespace computepipe
620} // namespace automotive
621} // namespace android