blob: df683910690f78c96b50acd3403081d0a2020340 [file] [log] [blame]
Igor Murashkine54aebc2019-01-22 17:58:51 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "common/debug.h"
18#include "common/expected.h"
19#include "manager/event_manager.h"
20#include "perfetto/rx_producer.h"
Igor Murashkinf2e01062019-04-12 15:57:55 -070021#include "prefetcher/read_ahead.h"
22#include "prefetcher/task_id.h"
Igor Murashkine54aebc2019-01-22 17:58:51 -080023
24#include <android-base/properties.h>
25#include <rxcpp/rx.hpp>
26
27#include <atomic>
28#include <functional>
29
30using rxcpp::observe_on_one_worker;
31
32namespace iorap::manager {
33
Igor Murashkine54aebc2019-01-22 17:58:51 -080034using binder::AppLaunchEvent;
Igor Murashkina128a862019-02-01 13:26:37 -080035using binder::JobScheduledEvent;
36using binder::RequestId;
37using binder::TaskResult;
38
Igor Murashkine54aebc2019-01-22 17:58:51 -080039using perfetto::PerfettoStreamCommand;
40using perfetto::PerfettoTraceProto;
41
42struct AppComponentName {
43 std::string package;
44 std::string activity_name;
45
46 static bool HasAppComponentName(const std::string& s) {
47 return s.find('/') != std::string::npos;
48 }
49
50 // "com.foo.bar/.A" -> {"com.foo.bar", ".A"}
51 static AppComponentName FromString(const std::string& s) {
52 constexpr const char delimiter = '/';
53 std::string package = s.substr(0, delimiter);
54
55 std::string activity_name = s;
56 activity_name.erase(0, s.find(delimiter) + sizeof(delimiter));
57
58 return {std::move(package), std::move(activity_name)};
59 }
60
61 // {"com.foo.bar", ".A"} -> "com.foo.bar/.A"
62 std::string ToString() const {
63 return package + "/" + activity_name;
64 }
65
66 /*
67 * '/' is encoded into %2F
68 * '%' is encoded into %25
69 *
70 * This allows the component name to be be used as a file name
71 * ('/' is illegal due to being a path separator) with minimal
72 * munging.
73 */
74
75 // "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"}
76 static AppComponentName FromUrlEncodedString(const std::string& s) {
77 std::string cpy = s;
78 Replace(cpy, "%2F", "/");
79 Replace(cpy, "%25", "%");
80
81 return FromString(cpy);
82 }
83
84 // {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25"
85 std::string ToUrlEncodedString() const {
86 std::string s = ToString();
87 Replace(s, "%", "%25");
88 Replace(s, "/", "%2F");
89 return s;
90 }
91
Yan Wangcb4cc8c2019-08-19 12:34:06 -070092 /*
93 * '/' is encoded into @@
94 * '%' is encoded into ^^
95 *
96 * Two purpose:
Yan Wang2fa45ca2019-08-28 01:45:54 +000097 * 1. This allows the component name to be used as a file name
Yan Wangcb4cc8c2019-08-19 12:34:06 -070098 * ('/' is illegal due to being a path separator) with minimal
99 * munging.
Yan Wang2fa45ca2019-08-28 01:45:54 +0000100 * 2. This allows the component name to be used in .mk file because
Yan Wangcb4cc8c2019-08-19 12:34:06 -0700101 * '%' is a special char and cannot be easily escaped in Makefile.
102 *
Yan Wang2fa45ca2019-08-28 01:45:54 +0000103 * This is a workround for test purpose.
Yan Wangcb4cc8c2019-08-19 12:34:06 -0700104 * Hopefully, the double "@@" and "^^" are not used in other cases.
105 */
Yan Wang2fa45ca2019-08-28 01:45:54 +0000106
107 // "com.foo.bar@@.A^^25" -> {"com.foo.bar", ".A%"}
108 static AppComponentName FromMakFileSafeEncodedString(const std::string& s) {
109 std::string cpy = s;
110 Replace(cpy, "@@", "/");
111 Replace(cpy, "^^", "%");
112
113 return FromString(cpy);
114 }
115
116 // {"com.foo.bar", ".A%"} -> "com.foo.bar@@.A^^"
117 std::string ToMakeFileSafeEncodedString() const {
118 std::string s = ToString();
Yan Wangcb4cc8c2019-08-19 12:34:06 -0700119 Replace(s, "/", "@@");
120 Replace(s, "%", "^^");
121 return s;
122 }
123
Igor Murashkine54aebc2019-01-22 17:58:51 -0800124 private:
125 static bool Replace(std::string& str, const std::string& from, const std::string& to) {
126 // TODO: call in a loop to replace all occurrences, not just the first one.
127 const size_t start_pos = str.find(from);
128 if (start_pos == std::string::npos) {
129 return false;
130 }
131
132 str.replace(start_pos, from.length(), to);
133
134 return true;
135}
136};
137
138std::ostream& operator<<(std::ostream& os, const AppComponentName& name) {
139 os << name.ToString();
140 return os;
141}
142
143// Main logic of the #OnAppLaunchEvent scan method.
144//
145// All functions are called from the same thread as the event manager
146// functions.
147//
148// This is a data type, it's moved (std::move) around from one iteration
149// of #scan to another.
150struct AppLaunchEventState {
151 std::optional<AppComponentName> component_name_;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700152 // Sequence ID is shared amongst the same app launch sequence,
153 // but changes whenever a new app launch sequence begins.
154 size_t sequence_id_ = static_cast<size_t>(-1);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800155
Igor Murashkinf2e01062019-04-12 15:57:55 -0700156 prefetcher::ReadAhead read_ahead_;
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700157 bool allowed_readahead_{true};
Igor Murashkinf2e01062019-04-12 15:57:55 -0700158 bool is_read_ahead_{false};
159 std::optional<prefetcher::TaskId> read_ahead_task_;
160
161 bool allowed_tracing_{true};
Igor Murashkine54aebc2019-01-22 17:58:51 -0800162 bool is_tracing_{false};
163 std::optional<rxcpp::composite_subscription> rx_lifetime_;
164 std::vector<rxcpp::composite_subscription> rx_in_flight_;
165
166 borrowed<perfetto::RxProducerFactory*> perfetto_factory_; // not null
167 borrowed<observe_on_one_worker*> thread_; // not null
168 borrowed<observe_on_one_worker*> io_thread_; // not null
169
170 explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory,
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700171 bool allowed_readahead,
Igor Murashkinf2e01062019-04-12 15:57:55 -0700172 bool allowed_tracing,
Igor Murashkine54aebc2019-01-22 17:58:51 -0800173 borrowed<observe_on_one_worker*> thread,
174 borrowed<observe_on_one_worker*> io_thread) {
175 perfetto_factory_ = perfetto_factory;
176 DCHECK(perfetto_factory_ != nullptr);
177
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700178 allowed_readahead_ = allowed_readahead;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700179 allowed_tracing_ = allowed_tracing;
180
Igor Murashkine54aebc2019-01-22 17:58:51 -0800181 thread_ = thread;
182 DCHECK(thread_ != nullptr);
183
184 io_thread_ = io_thread;
185 DCHECK(io_thread_ != nullptr);
186 }
187
188 // Updates the values in this struct only as a side effect.
189 //
190 // May create and fire a new rx chain on the same threads as passed
191 // in by the constructors.
192 void OnNewEvent(const AppLaunchEvent& event) {
193 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event;
194
195 using Type = AppLaunchEvent::Type;
196
Igor Murashkinf2e01062019-04-12 15:57:55 -0700197 DCHECK_GE(event.sequence_id, 0);
198 sequence_id_ = static_cast<size_t>(event.sequence_id);
199
Igor Murashkine54aebc2019-01-22 17:58:51 -0800200 switch (event.type) {
201 case Type::kIntentStarted: {
202 DCHECK(!IsTracing());
203 // Optimistically start tracing if we have the activity in the intent.
204 if (!event.intent_proto->has_component()) {
205 // Can't do anything if there is no component in the proto.
206 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace";
207 break;
208 }
209
210 const std::string& package_name = event.intent_proto->component().package_name();
211 const std::string& class_name = event.intent_proto->component().class_name();
212 AppComponentName component_name{package_name, class_name};
213
214 component_name_ = component_name;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700215
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700216 if (allowed_readahead_) {
217 StartReadAhead(sequence_id_, component_name);
218 }
Igor Murashkinf2e01062019-04-12 15:57:55 -0700219 if (allowed_tracing_) {
220 rx_lifetime_ = StartTracing(std::move(component_name));
221 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800222
223 break;
224 }
225 case Type::kIntentFailed:
Igor Murashkinf2e01062019-04-12 15:57:55 -0700226 if (allowed_tracing_) {
227 AbortTrace();
228 }
229 AbortReadAhead();
Igor Murashkine54aebc2019-01-22 17:58:51 -0800230 break;
231 case Type::kActivityLaunched: {
232 // Cancel tracing for warm/hot.
233 // Restart tracing if the activity was unexpected.
234
235 AppLaunchEvent::Temperature temperature = event.temperature;
236 if (temperature != AppLaunchEvent::Temperature::kCold) {
237 LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700238
239 if (allowed_tracing_) {
240 AbortTrace();
241 }
242 AbortReadAhead();
243 } else if (!IsTracing() || !IsReadAhead()) { // and the temperature is Cold.
Igor Murashkine54aebc2019-01-22 17:58:51 -0800244 // Start late trace when intent didn't have a component name
245 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace";
246
247 const std::string& title = event.activity_record_proto->identifier().title();
248 if (!AppComponentName::HasAppComponentName(title)) {
249 // Proto comment claim this is sometimes a window title.
250 // We need the actual 'package/component' here, so just ignore it if it's a title.
251 LOG(WARNING) << "App launched without a component name: " << event;
252 break;
253 }
254
255 AppComponentName component_name = AppComponentName::FromString(title);
256
257 component_name_ = component_name;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700258
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700259 if (allowed_readahead_ && !IsReadAhead()) {
260 StartReadAhead(sequence_id_, component_name);
261 }
Igor Murashkinf2e01062019-04-12 15:57:55 -0700262 if (allowed_tracing_ && !IsTracing()) {
263 rx_lifetime_ = StartTracing(std::move(component_name));
264 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800265 } else {
266 // FIXME: match actual component name against intent component name.
267 // abort traces if they don't match.
268
Igor Murashkinf2e01062019-04-12 15:57:55 -0700269 if (allowed_tracing_) {
270 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing";
271 }
272 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already doing readahead";
Igor Murashkine54aebc2019-01-22 17:58:51 -0800273 }
274 break;
275 }
276 case Type::kActivityLaunchFinished:
277 // Finish tracing and collect trace buffer.
278 //
279 // TODO: this happens automatically when perfetto finishes its
280 // trace duration.
281 if (IsTracing()) {
282 MarkPendingTrace();
283 }
Igor Murashkinf2e01062019-04-12 15:57:55 -0700284 if (IsReadAhead()) {
285 FinishReadAhead();
286 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800287 break;
288 case Type::kActivityLaunchCancelled:
289 // Abort tracing.
Igor Murashkinf2e01062019-04-12 15:57:55 -0700290 if (allowed_tracing_) {
291 AbortTrace();
292 }
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700293 if (IsReadAhead()) {
294 AbortReadAhead();
295 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800296 break;
297 default:
298 DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this.
299 LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this.
300 }
301 }
302
Igor Murashkinf2e01062019-04-12 15:57:55 -0700303 // Is there an in-flight readahead task currently?
304 bool IsReadAhead() const {
305 return read_ahead_task_.has_value();
306 }
307
308 void StartReadAhead(size_t id, const AppComponentName& component_name) {
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700309 DCHECK(allowed_readahead_);
Igor Murashkinf2e01062019-04-12 15:57:55 -0700310 DCHECK(!IsReadAhead());
311
Yan Wangcb4cc8c2019-08-19 12:34:06 -0700312 // This is changed from "/data/misc/iorapd/" for testing purpose.
313 // Should be restored b/139831359.
314 std::string file_path = "/product/iorap-trace/";
Yan Wang2fa45ca2019-08-28 01:45:54 +0000315 file_path += component_name.ToMakeFileSafeEncodedString();
Igor Murashkinf2e01062019-04-12 15:57:55 -0700316 file_path += ".compiled_trace.pb";
317
318 prefetcher::TaskId task{id, std::move(file_path)};
319 read_ahead_.BeginTask(task);
320 // TODO: non-void return signature?
321
322 read_ahead_task_ = std::move(task);
323 }
324
325 void FinishReadAhead() {
326 DCHECK(IsReadAhead());
327
328 read_ahead_.FinishTask(*read_ahead_task_);
329 read_ahead_task_ = std::nullopt;
330 }
331
332 void AbortReadAhead() {
333 FinishReadAhead();
334 }
335
Igor Murashkine54aebc2019-01-22 17:58:51 -0800336 bool IsTracing() const {
337 return is_tracing_;
338 }
339
340 rxcpp::composite_subscription StartTracing(AppComponentName component_name) {
Igor Murashkinf2e01062019-04-12 15:57:55 -0700341 DCHECK(allowed_tracing_);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800342 DCHECK(!IsTracing());
343
344 auto /*observable<PerfettoStreamCommand>*/ perfetto_commands =
345 rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing)
346 // wait 1x
347 .concat(
348 // Pick a value longer than the perfetto config delay_ms, so that we send
349 // 'kShutdown' after tracing has already finished.
350 rxcpp::observable<>::interval(std::chrono::milliseconds(10000))
351 .take(2) // kStopTracing, kShutdown.
352 .map([](int value) {
353 // value is 1,2,3,...
354 return static_cast<PerfettoStreamCommand>(value); // 1,2, ...
355 })
356 );
357
358 auto /*observable<PerfettoTraceProto>*/ trace_proto_stream =
359 perfetto_factory_->CreateTraceStream(perfetto_commands);
360 // This immediately connects to perfetto asynchronously.
361 //
362 // TODO: create a perfetto handle earlier, to minimize perfetto startup latency.
363
364 rxcpp::composite_subscription lifetime;
365
366 trace_proto_stream
367 .tap([](const PerfettoTraceProto& trace_proto) {
368 LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)";
369 })
370 .observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
371 .subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
372 .observe_on(*io_thread_) // Write data on an idle-class-priority thread.
373 .tap([](const PerfettoTraceProto& trace_proto) {
374 LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)";
375 })
376 .as_blocking() // TODO: remove.
377 .subscribe(/*out*/lifetime,
378 /*on_next*/[component_name]
379 (PerfettoTraceProto trace_proto) {
380 std::string file_path = "/data/misc/iorapd/";
381 file_path += component_name.ToUrlEncodedString();
382 file_path += ".perfetto_trace.pb";
383
384 // TODO: timestamp each file into a subdirectory.
385
386 if (!trace_proto.WriteFullyToFile(file_path)) {
387 LOG(ERROR) << "Failed to save TraceBuffer to " << file_path;
388 } else {
389 LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path;
390 }
391 },
392 /*on_error*/[](rxcpp::util::error_ptr err) {
393 LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err);
394 });
395
396 is_tracing_ = true;
397
398 return lifetime;
399 }
400
401 void AbortTrace() {
402 LOG(VERBOSE) << "AppLaunchEventState - AbortTrace";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700403 DCHECK(allowed_tracing_);
404
Igor Murashkine54aebc2019-01-22 17:58:51 -0800405 is_tracing_ = false;
406 if (rx_lifetime_) {
407 // TODO: it would be good to call perfetto Destroy.
408
409 LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe";
410 rx_lifetime_->unsubscribe();
411 rx_lifetime_.reset();
412 }
413 }
414
415 void MarkPendingTrace() {
416 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700417 DCHECK(allowed_tracing_);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800418 DCHECK(is_tracing_);
419 DCHECK(rx_lifetime_.has_value());
420
421 if (rx_lifetime_) {
422 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved";
423 // Don't unsubscribe because that would cause the perfetto TraceBuffer
424 // to get dropped on the floor.
425 //
426 // Instead, we want to let it finish and write it out to a file.
427 rx_in_flight_.push_back(*std::move(rx_lifetime_));
428 rx_lifetime_.reset();
429 } else {
430 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty";
431 }
432
433 // FIXME: how do we clear this vector?
434 }
435};
436
437// Convert callback pattern into reactive pattern.
438struct AppLaunchEventSubject {
439 using RefWrapper =
440 std::reference_wrapper<const AppLaunchEvent>;
441
442 AppLaunchEventSubject() {}
443
444 void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) {
445 DCHECK(ready_ != true) << "Cannot Subscribe twice";
446
447 subscriber_ = std::move(subscriber);
448
449 // Release edge of synchronizes-with AcquireIsReady.
450 ready_.store(true);
451 }
452
453 void OnNext(const AppLaunchEvent& e) {
454 if (!AcquireIsReady()) {
455 return;
456 }
457
458 if (!subscriber_->is_subscribed()) {
459 return;
460 }
461
462 /*
463 * TODO: fix upstream.
464 *
465 * Rx workaround: this fails to compile when
466 * the observable is a reference type:
467 *
468 * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const'
469 * virtual void on_next(T&&) const {};
470 *
471 * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here
472 * virtual void on_next(T&) const {};
473 *
474 * (The workaround is to use reference_wrapper instead
475 * of const AppLaunchEvent&)
476 */
477 subscriber_->on_next(std::cref(e));
478
479 }
480
481 void OnCompleted() {
482 if (!AcquireIsReady()) {
483 return;
484 }
485
486 subscriber_->on_completed();
487 }
488
489 private:
490 bool AcquireIsReady() {
491 // Synchronizes-with the release-edge in Subscribe.
492 // This can happen much later, only once the subscription actually happens.
493
494 // However, as far as I know, 'rxcpp::subscriber' is not thread safe,
495 // (but the observable chain itself can be made thread-safe via #observe_on, etc).
496 // so we must avoid reading it until it has been fully synchronized.
497 //
498 // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
499 // to make it simpler.
500 return ready_.load();
501 }
502
503 // TODO: also track the RequestId ?
504
505 std::atomic<bool> ready_{false};
506
507
508 std::optional<rxcpp::subscriber<RefWrapper>> subscriber_;
509};
510
Igor Murashkina128a862019-02-01 13:26:37 -0800511// Convert callback pattern into reactive pattern.
512struct JobScheduledEventSubject {
513 JobScheduledEventSubject() {}
514
515 void Subscribe(rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>> subscriber) {
516 DCHECK(ready_ != true) << "Cannot Subscribe twice";
517
518 subscriber_ = std::move(subscriber);
519
520 // Release edge of synchronizes-with AcquireIsReady.
521 ready_.store(true);
522 }
523
524 void OnNext(RequestId request_id, JobScheduledEvent e) {
525 if (!AcquireIsReady()) {
526 return;
527 }
528
529 if (!subscriber_->is_subscribed()) {
530 return;
531 }
532
533 subscriber_->on_next(std::pair<RequestId, JobScheduledEvent>{std::move(request_id), std::move(e)});
534
535 }
536
537 void OnCompleted() {
538 if (!AcquireIsReady()) {
539 return;
540 }
541
542 subscriber_->on_completed();
543 }
544
545 private:
546 bool AcquireIsReady() {
547 // Synchronizes-with the release-edge in Subscribe.
548 // This can happen much later, only once the subscription actually happens.
549
550 // However, as far as I know, 'rxcpp::subscriber' is not thread safe,
551 // (but the observable chain itself can be made thread-safe via #observe_on, etc).
552 // so we must avoid reading it until it has been fully synchronized.
553 //
554 // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
555 // to make it simpler.
556 return ready_.load();
557 }
558
559 // TODO: also track the RequestId ?
560
561 std::atomic<bool> ready_{false};
562
563 std::optional<rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>>> subscriber_;
564};
565
Igor Murashkine54aebc2019-01-22 17:58:51 -0800566class EventManager::Impl {
567 public:
568 Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory)
569 : perfetto_factory_(perfetto_factory),
570 worker_thread_(rxcpp::observe_on_new_thread()),
571 worker_thread2_(rxcpp::observe_on_new_thread()),
572 io_thread_(perfetto::ObserveOnNewIoThread()) {
573
574 // TODO: read all properties from one config class.
575 tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false);
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700576 readahead_allowed_ = ::android::base::GetBoolProperty("iorapd.readahead.enable",
577 /*default*/false);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800578
Igor Murashkinf2e01062019-04-12 15:57:55 -0700579 rx_lifetime_ = InitializeRxGraph();
Igor Murashkina128a862019-02-01 13:26:37 -0800580 rx_lifetime_jobs_ = InitializeRxGraphForJobScheduledEvents();
581 }
582
583 void SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) {
584 DCHECK(callbacks_.expired());
585 callbacks_ = callbacks;
Igor Murashkine54aebc2019-01-22 17:58:51 -0800586 }
587
588 bool OnAppLaunchEvent(RequestId request_id,
589 const AppLaunchEvent& event) {
590 LOG(VERBOSE) << "EventManager::OnAppLaunchEvent("
591 << "request_id=" << request_id.request_id << ","
592 << event;
593
594 app_launch_event_subject_.OnNext(event);
595
596 return true;
597 }
598
Igor Murashkina128a862019-02-01 13:26:37 -0800599 bool OnJobScheduledEvent(RequestId request_id,
600 const JobScheduledEvent& event) {
601 LOG(VERBOSE) << "EventManager::OnJobScheduledEvent("
602 << "request_id=" << request_id.request_id << ",event=TODO).";
603
604 job_scheduled_event_subject_.OnNext(std::move(request_id), event);
605
606 return true; // No errors.
607 }
608
Igor Murashkine54aebc2019-01-22 17:58:51 -0800609 rxcpp::composite_subscription InitializeRxGraph() {
610 LOG(VERBOSE) << "EventManager::InitializeRxGraph";
611
612 app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>(
613 [&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) {
614 app_launch_event_subject_.Subscribe(std::move(subscriber));
615 });
616
617 rxcpp::composite_subscription lifetime;
618
Igor Murashkinf2e01062019-04-12 15:57:55 -0700619 if (!tracing_allowed_) {
620 LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false";
621 }
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700622 if (!readahead_allowed_) {
623 LOG(WARNING) << "Readahead disabled by iorapd.readahead.enable=false";
624 }
Igor Murashkinf2e01062019-04-12 15:57:55 -0700625
626 AppLaunchEventState initial_state{&perfetto_factory_,
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700627 readahead_allowed_,
Igor Murashkinf2e01062019-04-12 15:57:55 -0700628 tracing_allowed_,
629 &worker_thread2_,
630 &io_thread_};
Igor Murashkine54aebc2019-01-22 17:58:51 -0800631 app_launch_events_
632 .subscribe_on(worker_thread_)
633 .scan(std::move(initial_state),
634 [](AppLaunchEventState state, AppLaunchEventRefWrapper event) {
635 state.OnNewEvent(event.get());
636 return state;
637 })
638 .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) {
639 // Intentionally left blank.
640 (void)state;
641 });
642
643 return lifetime;
644 }
645
Igor Murashkina128a862019-02-01 13:26:37 -0800646 rxcpp::composite_subscription InitializeRxGraphForJobScheduledEvents() {
647 LOG(VERBOSE) << "EventManager::InitializeRxGraphForJobScheduledEvents";
648
649 using RequestAndJobEvent = std::pair<RequestId, JobScheduledEvent>;
650
651 job_scheduled_events_ = rxcpp::observable<>::create<RequestAndJobEvent>(
652 [&](rxcpp::subscriber<RequestAndJobEvent> subscriber) {
653 job_scheduled_event_subject_.Subscribe(std::move(subscriber));
654 });
655
656 rxcpp::composite_subscription lifetime;
657
658 job_scheduled_events_
659 .observe_on(worker_thread_) // async handling.
660 .tap([this](const RequestAndJobEvent& e) {
661 LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(1) - job begins";
662 this->NotifyProgress(e.first, TaskResult{TaskResult::State::kBegan});
663
664 // TODO: probably this shouldn't be emitted until most of the usual DCHECKs
665 // (for example, validate a job isn't already started, the request is not reused, etc).
666 // In this way we could block from the client until it sees 'kBegan' and Log.wtf otherwise.
667 })
668 .tap([](const RequestAndJobEvent& e) {
669 // TODO. Actual work.
670 LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(2) - job is being processed";
671
672 // TODO: abort functionality for in-flight jobs.
673 //
674 // maybe something like scan that returns an observable<Job> + flat map to that job.
675 // then we could unsubscribe from the scan to do a partial abort? need to try it and see if it works.
676 //
677 // other option is to create a new outer subscription for each job id which seems less ideal.
678 })
679 .subscribe(/*out*/lifetime,
680 /*on_next*/
681 [this](const RequestAndJobEvent& e) {
682 LOG(VERBOSE) << "EventManager#JobScheduledEvent#subscribe - job completed";
683 this->NotifyComplete(e.first, TaskResult{TaskResult::State::kCompleted});
684 }
685#if 0
686 ,
687 /*on_error*/
688 [](rxcpp::util::error_ptr err) {
689 LOG(ERROR) << "Scheduled job event failed: " << rxcpp::util::what(err);
690
691 //std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
692 //if (callbacks != nullptr) {
693 // FIXME: How do we get the request ID back out of the error? Seems like a problem.
694 // callbacks->OnComplete(, TaskResult{TaskResult::kError});
695 // We may have to wrap with an iorap::expected instead of using on_error.
696 //}
697
698 // FIXME: need to add a 'OnErrorResumeNext' operator?
699 DCHECK(false) << "forgot to implement OnErrorResumeNext";
700 }
701#endif
702 );
703
704 // TODO: error output should happen via an observable.
705
706 return lifetime;
707 }
708
709 void NotifyComplete(RequestId request_id, TaskResult result) {
710 std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
711 if (callbacks != nullptr) {
712 callbacks->OnComplete(std::move(request_id), std::move(result));
713 } else {
714 LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early";
715 }
716 }
717
718 void NotifyProgress(RequestId request_id, TaskResult result) {
719 std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
720 if (callbacks != nullptr) {
721 callbacks->OnProgress(std::move(request_id), std::move(result));
722 } else {
723 LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early";
724 }
725 }
726
Igor Murashkina7bc40a2019-06-10 16:04:56 -0700727 bool readahead_allowed_{true};
728
Igor Murashkine54aebc2019-01-22 17:58:51 -0800729 perfetto::RxProducerFactory& perfetto_factory_;
730 bool tracing_allowed_{true};
731
Igor Murashkina128a862019-02-01 13:26:37 -0800732 std::weak_ptr<TaskResultCallbacks> callbacks_; // avoid cycles with weakptr.
733
Igor Murashkine54aebc2019-01-22 17:58:51 -0800734 using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper;
735 rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_;
736 AppLaunchEventSubject app_launch_event_subject_;
737
Igor Murashkina128a862019-02-01 13:26:37 -0800738 rxcpp::observable<std::pair<RequestId, JobScheduledEvent>> job_scheduled_events_;
739 JobScheduledEventSubject job_scheduled_event_subject_;
740
Igor Murashkine54aebc2019-01-22 17:58:51 -0800741 rxcpp::observable<RequestId> completed_requests_;
742
743 // regular-priority thread to handle binder callbacks.
744 observe_on_one_worker worker_thread_;
745 observe_on_one_worker worker_thread2_;
746 // low priority idle-class thread for IO operations.
747 observe_on_one_worker io_thread_;
748
Igor Murashkina128a862019-02-01 13:26:37 -0800749 rxcpp::composite_subscription rx_lifetime_; // app launch events
750 rxcpp::composite_subscription rx_lifetime_jobs_; // job scheduled events
Igor Murashkine54aebc2019-01-22 17:58:51 -0800751
752//INTENTIONAL_COMPILER_ERROR_HERE:
753 // FIXME:
754 // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function
755 // that the main thread can call. This would subscribe on all the observables we internally
756 // have here (probably on an event-manager-dedicated thread for simplicity).
757 //
758 // ideally we'd just reuse the binder thread to handle the events but I'm not super sure,
759 // maybe this already works with the identity_current_thread coordination?
760};
761using Impl = EventManager::Impl;
762
763EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory)
764 : impl_(new Impl(perfetto_factory)) {}
765
766std::shared_ptr<EventManager> EventManager::Create() {
767 static perfetto::PerfettoDependencies::Injector injector{
768 perfetto::PerfettoDependencies::CreateComponent
769 };
770 static perfetto::RxProducerFactory producer_factory{
771 /*borrow*/injector
772 };
773 return EventManager::Create(/*borrow*/producer_factory);
774}
775
776std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) {
777 std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}};
778 return p;
779}
780
Igor Murashkina128a862019-02-01 13:26:37 -0800781void EventManager::SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) {
782 return impl_->SetTaskResultCallbacks(std::move(callbacks));
783}
784
Igor Murashkine54aebc2019-01-22 17:58:51 -0800785bool EventManager::OnAppLaunchEvent(RequestId request_id,
786 const AppLaunchEvent& event) {
787 return impl_->OnAppLaunchEvent(request_id, event);
788}
789
Igor Murashkina128a862019-02-01 13:26:37 -0800790bool EventManager::OnJobScheduledEvent(RequestId request_id,
791 const JobScheduledEvent& event) {
792 return impl_->OnJobScheduledEvent(request_id, event);
793}
794
Igor Murashkine54aebc2019-01-22 17:58:51 -0800795} // namespace iorap::manager