blob: 03cfd1486a82d1d0cf9dd1c91d79adb58a789ae0 [file] [log] [blame]
Igor Murashkine54aebc2019-01-22 17:58:51 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "common/debug.h"
18#include "common/expected.h"
19#include "manager/event_manager.h"
20#include "perfetto/rx_producer.h"
Igor Murashkinf2e01062019-04-12 15:57:55 -070021#include "prefetcher/read_ahead.h"
22#include "prefetcher/task_id.h"
Igor Murashkine54aebc2019-01-22 17:58:51 -080023
24#include <android-base/properties.h>
25#include <rxcpp/rx.hpp>
26
27#include <atomic>
28#include <functional>
29
30using rxcpp::observe_on_one_worker;
31
32namespace iorap::manager {
33
Igor Murashkine54aebc2019-01-22 17:58:51 -080034using binder::AppLaunchEvent;
Igor Murashkina128a862019-02-01 13:26:37 -080035using binder::JobScheduledEvent;
36using binder::RequestId;
37using binder::TaskResult;
38
Igor Murashkine54aebc2019-01-22 17:58:51 -080039using perfetto::PerfettoStreamCommand;
40using perfetto::PerfettoTraceProto;
41
42struct AppComponentName {
43 std::string package;
44 std::string activity_name;
45
46 static bool HasAppComponentName(const std::string& s) {
47 return s.find('/') != std::string::npos;
48 }
49
50 // "com.foo.bar/.A" -> {"com.foo.bar", ".A"}
51 static AppComponentName FromString(const std::string& s) {
52 constexpr const char delimiter = '/';
53 std::string package = s.substr(0, delimiter);
54
55 std::string activity_name = s;
56 activity_name.erase(0, s.find(delimiter) + sizeof(delimiter));
57
58 return {std::move(package), std::move(activity_name)};
59 }
60
61 // {"com.foo.bar", ".A"} -> "com.foo.bar/.A"
62 std::string ToString() const {
63 return package + "/" + activity_name;
64 }
65
66 /*
67 * '/' is encoded into %2F
68 * '%' is encoded into %25
69 *
70 * This allows the component name to be be used as a file name
71 * ('/' is illegal due to being a path separator) with minimal
72 * munging.
73 */
74
75 // "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"}
76 static AppComponentName FromUrlEncodedString(const std::string& s) {
77 std::string cpy = s;
78 Replace(cpy, "%2F", "/");
79 Replace(cpy, "%25", "%");
80
81 return FromString(cpy);
82 }
83
84 // {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25"
85 std::string ToUrlEncodedString() const {
86 std::string s = ToString();
87 Replace(s, "%", "%25");
88 Replace(s, "/", "%2F");
89 return s;
90 }
91
92 private:
93 static bool Replace(std::string& str, const std::string& from, const std::string& to) {
94 // TODO: call in a loop to replace all occurrences, not just the first one.
95 const size_t start_pos = str.find(from);
96 if (start_pos == std::string::npos) {
97 return false;
98 }
99
100 str.replace(start_pos, from.length(), to);
101
102 return true;
103}
104};
105
106std::ostream& operator<<(std::ostream& os, const AppComponentName& name) {
107 os << name.ToString();
108 return os;
109}
110
111// Main logic of the #OnAppLaunchEvent scan method.
112//
113// All functions are called from the same thread as the event manager
114// functions.
115//
116// This is a data type, it's moved (std::move) around from one iteration
117// of #scan to another.
118struct AppLaunchEventState {
119 std::optional<AppComponentName> component_name_;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700120 // Sequence ID is shared amongst the same app launch sequence,
121 // but changes whenever a new app launch sequence begins.
122 size_t sequence_id_ = static_cast<size_t>(-1);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800123
Igor Murashkinf2e01062019-04-12 15:57:55 -0700124 prefetcher::ReadAhead read_ahead_;
125 bool is_read_ahead_{false};
126 std::optional<prefetcher::TaskId> read_ahead_task_;
127
128 bool allowed_tracing_{true};
Igor Murashkine54aebc2019-01-22 17:58:51 -0800129 bool is_tracing_{false};
130 std::optional<rxcpp::composite_subscription> rx_lifetime_;
131 std::vector<rxcpp::composite_subscription> rx_in_flight_;
132
133 borrowed<perfetto::RxProducerFactory*> perfetto_factory_; // not null
134 borrowed<observe_on_one_worker*> thread_; // not null
135 borrowed<observe_on_one_worker*> io_thread_; // not null
136
137 explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory,
Igor Murashkinf2e01062019-04-12 15:57:55 -0700138 bool allowed_tracing,
Igor Murashkine54aebc2019-01-22 17:58:51 -0800139 borrowed<observe_on_one_worker*> thread,
140 borrowed<observe_on_one_worker*> io_thread) {
141 perfetto_factory_ = perfetto_factory;
142 DCHECK(perfetto_factory_ != nullptr);
143
Igor Murashkinf2e01062019-04-12 15:57:55 -0700144 allowed_tracing_ = allowed_tracing;
145
Igor Murashkine54aebc2019-01-22 17:58:51 -0800146 thread_ = thread;
147 DCHECK(thread_ != nullptr);
148
149 io_thread_ = io_thread;
150 DCHECK(io_thread_ != nullptr);
151 }
152
153 // Updates the values in this struct only as a side effect.
154 //
155 // May create and fire a new rx chain on the same threads as passed
156 // in by the constructors.
157 void OnNewEvent(const AppLaunchEvent& event) {
158 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event;
159
160 using Type = AppLaunchEvent::Type;
161
Igor Murashkinf2e01062019-04-12 15:57:55 -0700162 DCHECK_GE(event.sequence_id, 0);
163 sequence_id_ = static_cast<size_t>(event.sequence_id);
164
Igor Murashkine54aebc2019-01-22 17:58:51 -0800165 switch (event.type) {
166 case Type::kIntentStarted: {
167 DCHECK(!IsTracing());
168 // Optimistically start tracing if we have the activity in the intent.
169 if (!event.intent_proto->has_component()) {
170 // Can't do anything if there is no component in the proto.
171 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace";
172 break;
173 }
174
175 const std::string& package_name = event.intent_proto->component().package_name();
176 const std::string& class_name = event.intent_proto->component().class_name();
177 AppComponentName component_name{package_name, class_name};
178
179 component_name_ = component_name;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700180
181 StartReadAhead(sequence_id_, component_name);
182 if (allowed_tracing_) {
183 rx_lifetime_ = StartTracing(std::move(component_name));
184 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800185
186 break;
187 }
188 case Type::kIntentFailed:
Igor Murashkinf2e01062019-04-12 15:57:55 -0700189 if (allowed_tracing_) {
190 AbortTrace();
191 }
192 AbortReadAhead();
Igor Murashkine54aebc2019-01-22 17:58:51 -0800193 break;
194 case Type::kActivityLaunched: {
195 // Cancel tracing for warm/hot.
196 // Restart tracing if the activity was unexpected.
197
198 AppLaunchEvent::Temperature temperature = event.temperature;
199 if (temperature != AppLaunchEvent::Temperature::kCold) {
200 LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700201
202 if (allowed_tracing_) {
203 AbortTrace();
204 }
205 AbortReadAhead();
206 } else if (!IsTracing() || !IsReadAhead()) { // and the temperature is Cold.
Igor Murashkine54aebc2019-01-22 17:58:51 -0800207 // Start late trace when intent didn't have a component name
208 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace";
209
210 const std::string& title = event.activity_record_proto->identifier().title();
211 if (!AppComponentName::HasAppComponentName(title)) {
212 // Proto comment claim this is sometimes a window title.
213 // We need the actual 'package/component' here, so just ignore it if it's a title.
214 LOG(WARNING) << "App launched without a component name: " << event;
215 break;
216 }
217
218 AppComponentName component_name = AppComponentName::FromString(title);
219
220 component_name_ = component_name;
Igor Murashkinf2e01062019-04-12 15:57:55 -0700221
222 StartReadAhead(sequence_id_, component_name);
223 if (allowed_tracing_ && !IsTracing()) {
224 rx_lifetime_ = StartTracing(std::move(component_name));
225 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800226 } else {
227 // FIXME: match actual component name against intent component name.
228 // abort traces if they don't match.
229
Igor Murashkinf2e01062019-04-12 15:57:55 -0700230 if (allowed_tracing_) {
231 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing";
232 }
233 LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already doing readahead";
Igor Murashkine54aebc2019-01-22 17:58:51 -0800234 }
235 break;
236 }
237 case Type::kActivityLaunchFinished:
238 // Finish tracing and collect trace buffer.
239 //
240 // TODO: this happens automatically when perfetto finishes its
241 // trace duration.
242 if (IsTracing()) {
243 MarkPendingTrace();
244 }
Igor Murashkinf2e01062019-04-12 15:57:55 -0700245 if (IsReadAhead()) {
246 FinishReadAhead();
247 }
Igor Murashkine54aebc2019-01-22 17:58:51 -0800248 break;
249 case Type::kActivityLaunchCancelled:
250 // Abort tracing.
Igor Murashkinf2e01062019-04-12 15:57:55 -0700251 if (allowed_tracing_) {
252 AbortTrace();
253 }
254 AbortReadAhead();
Igor Murashkine54aebc2019-01-22 17:58:51 -0800255 break;
256 default:
257 DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this.
258 LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this.
259 }
260 }
261
Igor Murashkinf2e01062019-04-12 15:57:55 -0700262 // Is there an in-flight readahead task currently?
263 bool IsReadAhead() const {
264 return read_ahead_task_.has_value();
265 }
266
267 void StartReadAhead(size_t id, const AppComponentName& component_name) {
268 DCHECK(!IsReadAhead());
269
270 std::string file_path = "/data/misc/iorapd/";
271 file_path += component_name.ToUrlEncodedString();
272 file_path += ".compiled_trace.pb";
273
274 prefetcher::TaskId task{id, std::move(file_path)};
275 read_ahead_.BeginTask(task);
276 // TODO: non-void return signature?
277
278 read_ahead_task_ = std::move(task);
279 }
280
281 void FinishReadAhead() {
282 DCHECK(IsReadAhead());
283
284 read_ahead_.FinishTask(*read_ahead_task_);
285 read_ahead_task_ = std::nullopt;
286 }
287
288 void AbortReadAhead() {
289 FinishReadAhead();
290 }
291
Igor Murashkine54aebc2019-01-22 17:58:51 -0800292 bool IsTracing() const {
293 return is_tracing_;
294 }
295
296 rxcpp::composite_subscription StartTracing(AppComponentName component_name) {
Igor Murashkinf2e01062019-04-12 15:57:55 -0700297 DCHECK(allowed_tracing_);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800298 DCHECK(!IsTracing());
299
300 auto /*observable<PerfettoStreamCommand>*/ perfetto_commands =
301 rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing)
302 // wait 1x
303 .concat(
304 // Pick a value longer than the perfetto config delay_ms, so that we send
305 // 'kShutdown' after tracing has already finished.
306 rxcpp::observable<>::interval(std::chrono::milliseconds(10000))
307 .take(2) // kStopTracing, kShutdown.
308 .map([](int value) {
309 // value is 1,2,3,...
310 return static_cast<PerfettoStreamCommand>(value); // 1,2, ...
311 })
312 );
313
314 auto /*observable<PerfettoTraceProto>*/ trace_proto_stream =
315 perfetto_factory_->CreateTraceStream(perfetto_commands);
316 // This immediately connects to perfetto asynchronously.
317 //
318 // TODO: create a perfetto handle earlier, to minimize perfetto startup latency.
319
320 rxcpp::composite_subscription lifetime;
321
322 trace_proto_stream
323 .tap([](const PerfettoTraceProto& trace_proto) {
324 LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)";
325 })
326 .observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
327 .subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
328 .observe_on(*io_thread_) // Write data on an idle-class-priority thread.
329 .tap([](const PerfettoTraceProto& trace_proto) {
330 LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)";
331 })
332 .as_blocking() // TODO: remove.
333 .subscribe(/*out*/lifetime,
334 /*on_next*/[component_name]
335 (PerfettoTraceProto trace_proto) {
336 std::string file_path = "/data/misc/iorapd/";
337 file_path += component_name.ToUrlEncodedString();
338 file_path += ".perfetto_trace.pb";
339
340 // TODO: timestamp each file into a subdirectory.
341
342 if (!trace_proto.WriteFullyToFile(file_path)) {
343 LOG(ERROR) << "Failed to save TraceBuffer to " << file_path;
344 } else {
345 LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path;
346 }
347 },
348 /*on_error*/[](rxcpp::util::error_ptr err) {
349 LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err);
350 });
351
352 is_tracing_ = true;
353
354 return lifetime;
355 }
356
357 void AbortTrace() {
358 LOG(VERBOSE) << "AppLaunchEventState - AbortTrace";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700359 DCHECK(allowed_tracing_);
360
Igor Murashkine54aebc2019-01-22 17:58:51 -0800361 is_tracing_ = false;
362 if (rx_lifetime_) {
363 // TODO: it would be good to call perfetto Destroy.
364
365 LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe";
366 rx_lifetime_->unsubscribe();
367 rx_lifetime_.reset();
368 }
369 }
370
371 void MarkPendingTrace() {
372 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace";
Igor Murashkinf2e01062019-04-12 15:57:55 -0700373 DCHECK(allowed_tracing_);
Igor Murashkine54aebc2019-01-22 17:58:51 -0800374 DCHECK(is_tracing_);
375 DCHECK(rx_lifetime_.has_value());
376
377 if (rx_lifetime_) {
378 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved";
379 // Don't unsubscribe because that would cause the perfetto TraceBuffer
380 // to get dropped on the floor.
381 //
382 // Instead, we want to let it finish and write it out to a file.
383 rx_in_flight_.push_back(*std::move(rx_lifetime_));
384 rx_lifetime_.reset();
385 } else {
386 LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty";
387 }
388
389 // FIXME: how do we clear this vector?
390 }
391};
392
393// Convert callback pattern into reactive pattern.
394struct AppLaunchEventSubject {
395 using RefWrapper =
396 std::reference_wrapper<const AppLaunchEvent>;
397
398 AppLaunchEventSubject() {}
399
400 void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) {
401 DCHECK(ready_ != true) << "Cannot Subscribe twice";
402
403 subscriber_ = std::move(subscriber);
404
405 // Release edge of synchronizes-with AcquireIsReady.
406 ready_.store(true);
407 }
408
409 void OnNext(const AppLaunchEvent& e) {
410 if (!AcquireIsReady()) {
411 return;
412 }
413
414 if (!subscriber_->is_subscribed()) {
415 return;
416 }
417
418 /*
419 * TODO: fix upstream.
420 *
421 * Rx workaround: this fails to compile when
422 * the observable is a reference type:
423 *
424 * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const'
425 * virtual void on_next(T&&) const {};
426 *
427 * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here
428 * virtual void on_next(T&) const {};
429 *
430 * (The workaround is to use reference_wrapper instead
431 * of const AppLaunchEvent&)
432 */
433 subscriber_->on_next(std::cref(e));
434
435 }
436
437 void OnCompleted() {
438 if (!AcquireIsReady()) {
439 return;
440 }
441
442 subscriber_->on_completed();
443 }
444
445 private:
446 bool AcquireIsReady() {
447 // Synchronizes-with the release-edge in Subscribe.
448 // This can happen much later, only once the subscription actually happens.
449
450 // However, as far as I know, 'rxcpp::subscriber' is not thread safe,
451 // (but the observable chain itself can be made thread-safe via #observe_on, etc).
452 // so we must avoid reading it until it has been fully synchronized.
453 //
454 // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
455 // to make it simpler.
456 return ready_.load();
457 }
458
459 // TODO: also track the RequestId ?
460
461 std::atomic<bool> ready_{false};
462
463
464 std::optional<rxcpp::subscriber<RefWrapper>> subscriber_;
465};
466
Igor Murashkina128a862019-02-01 13:26:37 -0800467// Convert callback pattern into reactive pattern.
468struct JobScheduledEventSubject {
469 JobScheduledEventSubject() {}
470
471 void Subscribe(rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>> subscriber) {
472 DCHECK(ready_ != true) << "Cannot Subscribe twice";
473
474 subscriber_ = std::move(subscriber);
475
476 // Release edge of synchronizes-with AcquireIsReady.
477 ready_.store(true);
478 }
479
480 void OnNext(RequestId request_id, JobScheduledEvent e) {
481 if (!AcquireIsReady()) {
482 return;
483 }
484
485 if (!subscriber_->is_subscribed()) {
486 return;
487 }
488
489 subscriber_->on_next(std::pair<RequestId, JobScheduledEvent>{std::move(request_id), std::move(e)});
490
491 }
492
493 void OnCompleted() {
494 if (!AcquireIsReady()) {
495 return;
496 }
497
498 subscriber_->on_completed();
499 }
500
501 private:
502 bool AcquireIsReady() {
503 // Synchronizes-with the release-edge in Subscribe.
504 // This can happen much later, only once the subscription actually happens.
505
506 // However, as far as I know, 'rxcpp::subscriber' is not thread safe,
507 // (but the observable chain itself can be made thread-safe via #observe_on, etc).
508 // so we must avoid reading it until it has been fully synchronized.
509 //
510 // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
511 // to make it simpler.
512 return ready_.load();
513 }
514
515 // TODO: also track the RequestId ?
516
517 std::atomic<bool> ready_{false};
518
519 std::optional<rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>>> subscriber_;
520};
521
Igor Murashkine54aebc2019-01-22 17:58:51 -0800522class EventManager::Impl {
523 public:
524 Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory)
525 : perfetto_factory_(perfetto_factory),
526 worker_thread_(rxcpp::observe_on_new_thread()),
527 worker_thread2_(rxcpp::observe_on_new_thread()),
528 io_thread_(perfetto::ObserveOnNewIoThread()) {
529
530 // TODO: read all properties from one config class.
531 tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false);
532
Igor Murashkinf2e01062019-04-12 15:57:55 -0700533 rx_lifetime_ = InitializeRxGraph();
Igor Murashkina128a862019-02-01 13:26:37 -0800534 rx_lifetime_jobs_ = InitializeRxGraphForJobScheduledEvents();
535 }
536
537 void SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) {
538 DCHECK(callbacks_.expired());
539 callbacks_ = callbacks;
Igor Murashkine54aebc2019-01-22 17:58:51 -0800540 }
541
542 bool OnAppLaunchEvent(RequestId request_id,
543 const AppLaunchEvent& event) {
544 LOG(VERBOSE) << "EventManager::OnAppLaunchEvent("
545 << "request_id=" << request_id.request_id << ","
546 << event;
547
548 app_launch_event_subject_.OnNext(event);
549
550 return true;
551 }
552
Igor Murashkina128a862019-02-01 13:26:37 -0800553 bool OnJobScheduledEvent(RequestId request_id,
554 const JobScheduledEvent& event) {
555 LOG(VERBOSE) << "EventManager::OnJobScheduledEvent("
556 << "request_id=" << request_id.request_id << ",event=TODO).";
557
558 job_scheduled_event_subject_.OnNext(std::move(request_id), event);
559
560 return true; // No errors.
561 }
562
Igor Murashkine54aebc2019-01-22 17:58:51 -0800563 rxcpp::composite_subscription InitializeRxGraph() {
564 LOG(VERBOSE) << "EventManager::InitializeRxGraph";
565
566 app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>(
567 [&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) {
568 app_launch_event_subject_.Subscribe(std::move(subscriber));
569 });
570
571 rxcpp::composite_subscription lifetime;
572
Igor Murashkinf2e01062019-04-12 15:57:55 -0700573 if (!tracing_allowed_) {
574 LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false";
575 }
576
577 AppLaunchEventState initial_state{&perfetto_factory_,
578 tracing_allowed_,
579 &worker_thread2_,
580 &io_thread_};
Igor Murashkine54aebc2019-01-22 17:58:51 -0800581 app_launch_events_
582 .subscribe_on(worker_thread_)
583 .scan(std::move(initial_state),
584 [](AppLaunchEventState state, AppLaunchEventRefWrapper event) {
585 state.OnNewEvent(event.get());
586 return state;
587 })
588 .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) {
589 // Intentionally left blank.
590 (void)state;
591 });
592
593 return lifetime;
594 }
595
Igor Murashkina128a862019-02-01 13:26:37 -0800596 rxcpp::composite_subscription InitializeRxGraphForJobScheduledEvents() {
597 LOG(VERBOSE) << "EventManager::InitializeRxGraphForJobScheduledEvents";
598
599 using RequestAndJobEvent = std::pair<RequestId, JobScheduledEvent>;
600
601 job_scheduled_events_ = rxcpp::observable<>::create<RequestAndJobEvent>(
602 [&](rxcpp::subscriber<RequestAndJobEvent> subscriber) {
603 job_scheduled_event_subject_.Subscribe(std::move(subscriber));
604 });
605
606 rxcpp::composite_subscription lifetime;
607
608 job_scheduled_events_
609 .observe_on(worker_thread_) // async handling.
610 .tap([this](const RequestAndJobEvent& e) {
611 LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(1) - job begins";
612 this->NotifyProgress(e.first, TaskResult{TaskResult::State::kBegan});
613
614 // TODO: probably this shouldn't be emitted until most of the usual DCHECKs
615 // (for example, validate a job isn't already started, the request is not reused, etc).
616 // In this way we could block from the client until it sees 'kBegan' and Log.wtf otherwise.
617 })
618 .tap([](const RequestAndJobEvent& e) {
619 // TODO. Actual work.
620 LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(2) - job is being processed";
621
622 // TODO: abort functionality for in-flight jobs.
623 //
624 // maybe something like scan that returns an observable<Job> + flat map to that job.
625 // then we could unsubscribe from the scan to do a partial abort? need to try it and see if it works.
626 //
627 // other option is to create a new outer subscription for each job id which seems less ideal.
628 })
629 .subscribe(/*out*/lifetime,
630 /*on_next*/
631 [this](const RequestAndJobEvent& e) {
632 LOG(VERBOSE) << "EventManager#JobScheduledEvent#subscribe - job completed";
633 this->NotifyComplete(e.first, TaskResult{TaskResult::State::kCompleted});
634 }
635#if 0
636 ,
637 /*on_error*/
638 [](rxcpp::util::error_ptr err) {
639 LOG(ERROR) << "Scheduled job event failed: " << rxcpp::util::what(err);
640
641 //std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
642 //if (callbacks != nullptr) {
643 // FIXME: How do we get the request ID back out of the error? Seems like a problem.
644 // callbacks->OnComplete(, TaskResult{TaskResult::kError});
645 // We may have to wrap with an iorap::expected instead of using on_error.
646 //}
647
648 // FIXME: need to add a 'OnErrorResumeNext' operator?
649 DCHECK(false) << "forgot to implement OnErrorResumeNext";
650 }
651#endif
652 );
653
654 // TODO: error output should happen via an observable.
655
656 return lifetime;
657 }
658
659 void NotifyComplete(RequestId request_id, TaskResult result) {
660 std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
661 if (callbacks != nullptr) {
662 callbacks->OnComplete(std::move(request_id), std::move(result));
663 } else {
664 LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early";
665 }
666 }
667
668 void NotifyProgress(RequestId request_id, TaskResult result) {
669 std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock();
670 if (callbacks != nullptr) {
671 callbacks->OnProgress(std::move(request_id), std::move(result));
672 } else {
673 LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early";
674 }
675 }
676
Igor Murashkine54aebc2019-01-22 17:58:51 -0800677 perfetto::RxProducerFactory& perfetto_factory_;
678 bool tracing_allowed_{true};
679
Igor Murashkina128a862019-02-01 13:26:37 -0800680 std::weak_ptr<TaskResultCallbacks> callbacks_; // avoid cycles with weakptr.
681
Igor Murashkine54aebc2019-01-22 17:58:51 -0800682 using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper;
683 rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_;
684 AppLaunchEventSubject app_launch_event_subject_;
685
Igor Murashkina128a862019-02-01 13:26:37 -0800686 rxcpp::observable<std::pair<RequestId, JobScheduledEvent>> job_scheduled_events_;
687 JobScheduledEventSubject job_scheduled_event_subject_;
688
Igor Murashkine54aebc2019-01-22 17:58:51 -0800689 rxcpp::observable<RequestId> completed_requests_;
690
691 // regular-priority thread to handle binder callbacks.
692 observe_on_one_worker worker_thread_;
693 observe_on_one_worker worker_thread2_;
694 // low priority idle-class thread for IO operations.
695 observe_on_one_worker io_thread_;
696
Igor Murashkina128a862019-02-01 13:26:37 -0800697 rxcpp::composite_subscription rx_lifetime_; // app launch events
698 rxcpp::composite_subscription rx_lifetime_jobs_; // job scheduled events
Igor Murashkine54aebc2019-01-22 17:58:51 -0800699
700//INTENTIONAL_COMPILER_ERROR_HERE:
701 // FIXME:
702 // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function
703 // that the main thread can call. This would subscribe on all the observables we internally
704 // have here (probably on an event-manager-dedicated thread for simplicity).
705 //
706 // ideally we'd just reuse the binder thread to handle the events but I'm not super sure,
707 // maybe this already works with the identity_current_thread coordination?
708};
709using Impl = EventManager::Impl;
710
711EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory)
712 : impl_(new Impl(perfetto_factory)) {}
713
714std::shared_ptr<EventManager> EventManager::Create() {
715 static perfetto::PerfettoDependencies::Injector injector{
716 perfetto::PerfettoDependencies::CreateComponent
717 };
718 static perfetto::RxProducerFactory producer_factory{
719 /*borrow*/injector
720 };
721 return EventManager::Create(/*borrow*/producer_factory);
722}
723
724std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) {
725 std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}};
726 return p;
727}
728
Igor Murashkina128a862019-02-01 13:26:37 -0800729void EventManager::SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) {
730 return impl_->SetTaskResultCallbacks(std::move(callbacks));
731}
732
Igor Murashkine54aebc2019-01-22 17:58:51 -0800733bool EventManager::OnAppLaunchEvent(RequestId request_id,
734 const AppLaunchEvent& event) {
735 return impl_->OnAppLaunchEvent(request_id, event);
736}
737
Igor Murashkina128a862019-02-01 13:26:37 -0800738bool EventManager::OnJobScheduledEvent(RequestId request_id,
739 const JobScheduledEvent& event) {
740 return impl_->OnJobScheduledEvent(request_id, event);
741}
742
Igor Murashkine54aebc2019-01-22 17:58:51 -0800743} // namespace iorap::manager