Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2019 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "common/debug.h" |
| 18 | #include "common/expected.h" |
| 19 | #include "manager/event_manager.h" |
| 20 | #include "perfetto/rx_producer.h" |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 21 | #include "prefetcher/read_ahead.h" |
| 22 | #include "prefetcher/task_id.h" |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 23 | |
| 24 | #include <android-base/properties.h> |
| 25 | #include <rxcpp/rx.hpp> |
| 26 | |
| 27 | #include <atomic> |
| 28 | #include <functional> |
| 29 | |
| 30 | using rxcpp::observe_on_one_worker; |
| 31 | |
| 32 | namespace iorap::manager { |
| 33 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 34 | using binder::AppLaunchEvent; |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 35 | using binder::JobScheduledEvent; |
| 36 | using binder::RequestId; |
| 37 | using binder::TaskResult; |
| 38 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 39 | using perfetto::PerfettoStreamCommand; |
| 40 | using perfetto::PerfettoTraceProto; |
| 41 | |
| 42 | struct AppComponentName { |
| 43 | std::string package; |
| 44 | std::string activity_name; |
| 45 | |
| 46 | static bool HasAppComponentName(const std::string& s) { |
| 47 | return s.find('/') != std::string::npos; |
| 48 | } |
| 49 | |
| 50 | // "com.foo.bar/.A" -> {"com.foo.bar", ".A"} |
| 51 | static AppComponentName FromString(const std::string& s) { |
| 52 | constexpr const char delimiter = '/'; |
| 53 | std::string package = s.substr(0, delimiter); |
| 54 | |
| 55 | std::string activity_name = s; |
| 56 | activity_name.erase(0, s.find(delimiter) + sizeof(delimiter)); |
| 57 | |
| 58 | return {std::move(package), std::move(activity_name)}; |
| 59 | } |
| 60 | |
| 61 | // {"com.foo.bar", ".A"} -> "com.foo.bar/.A" |
| 62 | std::string ToString() const { |
| 63 | return package + "/" + activity_name; |
| 64 | } |
| 65 | |
| 66 | /* |
| 67 | * '/' is encoded into %2F |
| 68 | * '%' is encoded into %25 |
| 69 | * |
| 70 | * This allows the component name to be be used as a file name |
| 71 | * ('/' is illegal due to being a path separator) with minimal |
| 72 | * munging. |
| 73 | */ |
| 74 | |
| 75 | // "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"} |
| 76 | static AppComponentName FromUrlEncodedString(const std::string& s) { |
| 77 | std::string cpy = s; |
| 78 | Replace(cpy, "%2F", "/"); |
| 79 | Replace(cpy, "%25", "%"); |
| 80 | |
| 81 | return FromString(cpy); |
| 82 | } |
| 83 | |
| 84 | // {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25" |
| 85 | std::string ToUrlEncodedString() const { |
| 86 | std::string s = ToString(); |
| 87 | Replace(s, "%", "%25"); |
| 88 | Replace(s, "/", "%2F"); |
| 89 | return s; |
| 90 | } |
| 91 | |
| 92 | private: |
| 93 | static bool Replace(std::string& str, const std::string& from, const std::string& to) { |
| 94 | // TODO: call in a loop to replace all occurrences, not just the first one. |
| 95 | const size_t start_pos = str.find(from); |
| 96 | if (start_pos == std::string::npos) { |
| 97 | return false; |
| 98 | } |
| 99 | |
| 100 | str.replace(start_pos, from.length(), to); |
| 101 | |
| 102 | return true; |
| 103 | } |
| 104 | }; |
| 105 | |
| 106 | std::ostream& operator<<(std::ostream& os, const AppComponentName& name) { |
| 107 | os << name.ToString(); |
| 108 | return os; |
| 109 | } |
| 110 | |
| 111 | // Main logic of the #OnAppLaunchEvent scan method. |
| 112 | // |
| 113 | // All functions are called from the same thread as the event manager |
| 114 | // functions. |
| 115 | // |
| 116 | // This is a data type, it's moved (std::move) around from one iteration |
| 117 | // of #scan to another. |
| 118 | struct AppLaunchEventState { |
| 119 | std::optional<AppComponentName> component_name_; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 120 | // Sequence ID is shared amongst the same app launch sequence, |
| 121 | // but changes whenever a new app launch sequence begins. |
| 122 | size_t sequence_id_ = static_cast<size_t>(-1); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 123 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 124 | prefetcher::ReadAhead read_ahead_; |
| 125 | bool is_read_ahead_{false}; |
| 126 | std::optional<prefetcher::TaskId> read_ahead_task_; |
| 127 | |
| 128 | bool allowed_tracing_{true}; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 129 | bool is_tracing_{false}; |
| 130 | std::optional<rxcpp::composite_subscription> rx_lifetime_; |
| 131 | std::vector<rxcpp::composite_subscription> rx_in_flight_; |
| 132 | |
| 133 | borrowed<perfetto::RxProducerFactory*> perfetto_factory_; // not null |
| 134 | borrowed<observe_on_one_worker*> thread_; // not null |
| 135 | borrowed<observe_on_one_worker*> io_thread_; // not null |
| 136 | |
| 137 | explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory, |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 138 | bool allowed_tracing, |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 139 | borrowed<observe_on_one_worker*> thread, |
| 140 | borrowed<observe_on_one_worker*> io_thread) { |
| 141 | perfetto_factory_ = perfetto_factory; |
| 142 | DCHECK(perfetto_factory_ != nullptr); |
| 143 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 144 | allowed_tracing_ = allowed_tracing; |
| 145 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 146 | thread_ = thread; |
| 147 | DCHECK(thread_ != nullptr); |
| 148 | |
| 149 | io_thread_ = io_thread; |
| 150 | DCHECK(io_thread_ != nullptr); |
| 151 | } |
| 152 | |
| 153 | // Updates the values in this struct only as a side effect. |
| 154 | // |
| 155 | // May create and fire a new rx chain on the same threads as passed |
| 156 | // in by the constructors. |
| 157 | void OnNewEvent(const AppLaunchEvent& event) { |
| 158 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event; |
| 159 | |
| 160 | using Type = AppLaunchEvent::Type; |
| 161 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 162 | DCHECK_GE(event.sequence_id, 0); |
| 163 | sequence_id_ = static_cast<size_t>(event.sequence_id); |
| 164 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 165 | switch (event.type) { |
| 166 | case Type::kIntentStarted: { |
| 167 | DCHECK(!IsTracing()); |
| 168 | // Optimistically start tracing if we have the activity in the intent. |
| 169 | if (!event.intent_proto->has_component()) { |
| 170 | // Can't do anything if there is no component in the proto. |
| 171 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace"; |
| 172 | break; |
| 173 | } |
| 174 | |
| 175 | const std::string& package_name = event.intent_proto->component().package_name(); |
| 176 | const std::string& class_name = event.intent_proto->component().class_name(); |
| 177 | AppComponentName component_name{package_name, class_name}; |
| 178 | |
| 179 | component_name_ = component_name; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 180 | |
| 181 | StartReadAhead(sequence_id_, component_name); |
| 182 | if (allowed_tracing_) { |
| 183 | rx_lifetime_ = StartTracing(std::move(component_name)); |
| 184 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 185 | |
| 186 | break; |
| 187 | } |
| 188 | case Type::kIntentFailed: |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 189 | if (allowed_tracing_) { |
| 190 | AbortTrace(); |
| 191 | } |
| 192 | AbortReadAhead(); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 193 | break; |
| 194 | case Type::kActivityLaunched: { |
| 195 | // Cancel tracing for warm/hot. |
| 196 | // Restart tracing if the activity was unexpected. |
| 197 | |
| 198 | AppLaunchEvent::Temperature temperature = event.temperature; |
| 199 | if (temperature != AppLaunchEvent::Temperature::kCold) { |
| 200 | LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 201 | |
| 202 | if (allowed_tracing_) { |
| 203 | AbortTrace(); |
| 204 | } |
| 205 | AbortReadAhead(); |
| 206 | } else if (!IsTracing() || !IsReadAhead()) { // and the temperature is Cold. |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 207 | // Start late trace when intent didn't have a component name |
| 208 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace"; |
| 209 | |
| 210 | const std::string& title = event.activity_record_proto->identifier().title(); |
| 211 | if (!AppComponentName::HasAppComponentName(title)) { |
| 212 | // Proto comment claim this is sometimes a window title. |
| 213 | // We need the actual 'package/component' here, so just ignore it if it's a title. |
| 214 | LOG(WARNING) << "App launched without a component name: " << event; |
| 215 | break; |
| 216 | } |
| 217 | |
| 218 | AppComponentName component_name = AppComponentName::FromString(title); |
| 219 | |
| 220 | component_name_ = component_name; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 221 | |
| 222 | StartReadAhead(sequence_id_, component_name); |
| 223 | if (allowed_tracing_ && !IsTracing()) { |
| 224 | rx_lifetime_ = StartTracing(std::move(component_name)); |
| 225 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 226 | } else { |
| 227 | // FIXME: match actual component name against intent component name. |
| 228 | // abort traces if they don't match. |
| 229 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 230 | if (allowed_tracing_) { |
| 231 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing"; |
| 232 | } |
| 233 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already doing readahead"; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 234 | } |
| 235 | break; |
| 236 | } |
| 237 | case Type::kActivityLaunchFinished: |
| 238 | // Finish tracing and collect trace buffer. |
| 239 | // |
| 240 | // TODO: this happens automatically when perfetto finishes its |
| 241 | // trace duration. |
| 242 | if (IsTracing()) { |
| 243 | MarkPendingTrace(); |
| 244 | } |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 245 | if (IsReadAhead()) { |
| 246 | FinishReadAhead(); |
| 247 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 248 | break; |
| 249 | case Type::kActivityLaunchCancelled: |
| 250 | // Abort tracing. |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 251 | if (allowed_tracing_) { |
| 252 | AbortTrace(); |
| 253 | } |
| 254 | AbortReadAhead(); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 255 | break; |
| 256 | default: |
| 257 | DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this. |
| 258 | LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this. |
| 259 | } |
| 260 | } |
| 261 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 262 | // Is there an in-flight readahead task currently? |
| 263 | bool IsReadAhead() const { |
| 264 | return read_ahead_task_.has_value(); |
| 265 | } |
| 266 | |
| 267 | void StartReadAhead(size_t id, const AppComponentName& component_name) { |
| 268 | DCHECK(!IsReadAhead()); |
| 269 | |
| 270 | std::string file_path = "/data/misc/iorapd/"; |
| 271 | file_path += component_name.ToUrlEncodedString(); |
| 272 | file_path += ".compiled_trace.pb"; |
| 273 | |
| 274 | prefetcher::TaskId task{id, std::move(file_path)}; |
| 275 | read_ahead_.BeginTask(task); |
| 276 | // TODO: non-void return signature? |
| 277 | |
| 278 | read_ahead_task_ = std::move(task); |
| 279 | } |
| 280 | |
| 281 | void FinishReadAhead() { |
| 282 | DCHECK(IsReadAhead()); |
| 283 | |
| 284 | read_ahead_.FinishTask(*read_ahead_task_); |
| 285 | read_ahead_task_ = std::nullopt; |
| 286 | } |
| 287 | |
| 288 | void AbortReadAhead() { |
| 289 | FinishReadAhead(); |
| 290 | } |
| 291 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 292 | bool IsTracing() const { |
| 293 | return is_tracing_; |
| 294 | } |
| 295 | |
| 296 | rxcpp::composite_subscription StartTracing(AppComponentName component_name) { |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 297 | DCHECK(allowed_tracing_); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 298 | DCHECK(!IsTracing()); |
| 299 | |
| 300 | auto /*observable<PerfettoStreamCommand>*/ perfetto_commands = |
| 301 | rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing) |
| 302 | // wait 1x |
| 303 | .concat( |
| 304 | // Pick a value longer than the perfetto config delay_ms, so that we send |
| 305 | // 'kShutdown' after tracing has already finished. |
| 306 | rxcpp::observable<>::interval(std::chrono::milliseconds(10000)) |
| 307 | .take(2) // kStopTracing, kShutdown. |
| 308 | .map([](int value) { |
| 309 | // value is 1,2,3,... |
| 310 | return static_cast<PerfettoStreamCommand>(value); // 1,2, ... |
| 311 | }) |
| 312 | ); |
| 313 | |
| 314 | auto /*observable<PerfettoTraceProto>*/ trace_proto_stream = |
| 315 | perfetto_factory_->CreateTraceStream(perfetto_commands); |
| 316 | // This immediately connects to perfetto asynchronously. |
| 317 | // |
| 318 | // TODO: create a perfetto handle earlier, to minimize perfetto startup latency. |
| 319 | |
| 320 | rxcpp::composite_subscription lifetime; |
| 321 | |
| 322 | trace_proto_stream |
| 323 | .tap([](const PerfettoTraceProto& trace_proto) { |
| 324 | LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)"; |
| 325 | }) |
| 326 | .observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. |
| 327 | .subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. |
| 328 | .observe_on(*io_thread_) // Write data on an idle-class-priority thread. |
| 329 | .tap([](const PerfettoTraceProto& trace_proto) { |
| 330 | LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)"; |
| 331 | }) |
| 332 | .as_blocking() // TODO: remove. |
| 333 | .subscribe(/*out*/lifetime, |
| 334 | /*on_next*/[component_name] |
| 335 | (PerfettoTraceProto trace_proto) { |
| 336 | std::string file_path = "/data/misc/iorapd/"; |
| 337 | file_path += component_name.ToUrlEncodedString(); |
| 338 | file_path += ".perfetto_trace.pb"; |
| 339 | |
| 340 | // TODO: timestamp each file into a subdirectory. |
| 341 | |
| 342 | if (!trace_proto.WriteFullyToFile(file_path)) { |
| 343 | LOG(ERROR) << "Failed to save TraceBuffer to " << file_path; |
| 344 | } else { |
| 345 | LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path; |
| 346 | } |
| 347 | }, |
| 348 | /*on_error*/[](rxcpp::util::error_ptr err) { |
| 349 | LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err); |
| 350 | }); |
| 351 | |
| 352 | is_tracing_ = true; |
| 353 | |
| 354 | return lifetime; |
| 355 | } |
| 356 | |
| 357 | void AbortTrace() { |
| 358 | LOG(VERBOSE) << "AppLaunchEventState - AbortTrace"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 359 | DCHECK(allowed_tracing_); |
| 360 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 361 | is_tracing_ = false; |
| 362 | if (rx_lifetime_) { |
| 363 | // TODO: it would be good to call perfetto Destroy. |
| 364 | |
| 365 | LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe"; |
| 366 | rx_lifetime_->unsubscribe(); |
| 367 | rx_lifetime_.reset(); |
| 368 | } |
| 369 | } |
| 370 | |
| 371 | void MarkPendingTrace() { |
| 372 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 373 | DCHECK(allowed_tracing_); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 374 | DCHECK(is_tracing_); |
| 375 | DCHECK(rx_lifetime_.has_value()); |
| 376 | |
| 377 | if (rx_lifetime_) { |
| 378 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved"; |
| 379 | // Don't unsubscribe because that would cause the perfetto TraceBuffer |
| 380 | // to get dropped on the floor. |
| 381 | // |
| 382 | // Instead, we want to let it finish and write it out to a file. |
| 383 | rx_in_flight_.push_back(*std::move(rx_lifetime_)); |
| 384 | rx_lifetime_.reset(); |
| 385 | } else { |
| 386 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty"; |
| 387 | } |
| 388 | |
| 389 | // FIXME: how do we clear this vector? |
| 390 | } |
| 391 | }; |
| 392 | |
| 393 | // Convert callback pattern into reactive pattern. |
| 394 | struct AppLaunchEventSubject { |
| 395 | using RefWrapper = |
| 396 | std::reference_wrapper<const AppLaunchEvent>; |
| 397 | |
| 398 | AppLaunchEventSubject() {} |
| 399 | |
| 400 | void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) { |
| 401 | DCHECK(ready_ != true) << "Cannot Subscribe twice"; |
| 402 | |
| 403 | subscriber_ = std::move(subscriber); |
| 404 | |
| 405 | // Release edge of synchronizes-with AcquireIsReady. |
| 406 | ready_.store(true); |
| 407 | } |
| 408 | |
| 409 | void OnNext(const AppLaunchEvent& e) { |
| 410 | if (!AcquireIsReady()) { |
| 411 | return; |
| 412 | } |
| 413 | |
| 414 | if (!subscriber_->is_subscribed()) { |
| 415 | return; |
| 416 | } |
| 417 | |
| 418 | /* |
| 419 | * TODO: fix upstream. |
| 420 | * |
| 421 | * Rx workaround: this fails to compile when |
| 422 | * the observable is a reference type: |
| 423 | * |
| 424 | * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const' |
| 425 | * virtual void on_next(T&&) const {}; |
| 426 | * |
| 427 | * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here |
| 428 | * virtual void on_next(T&) const {}; |
| 429 | * |
| 430 | * (The workaround is to use reference_wrapper instead |
| 431 | * of const AppLaunchEvent&) |
| 432 | */ |
| 433 | subscriber_->on_next(std::cref(e)); |
| 434 | |
| 435 | } |
| 436 | |
| 437 | void OnCompleted() { |
| 438 | if (!AcquireIsReady()) { |
| 439 | return; |
| 440 | } |
| 441 | |
| 442 | subscriber_->on_completed(); |
| 443 | } |
| 444 | |
| 445 | private: |
| 446 | bool AcquireIsReady() { |
| 447 | // Synchronizes-with the release-edge in Subscribe. |
| 448 | // This can happen much later, only once the subscription actually happens. |
| 449 | |
| 450 | // However, as far as I know, 'rxcpp::subscriber' is not thread safe, |
| 451 | // (but the observable chain itself can be made thread-safe via #observe_on, etc). |
| 452 | // so we must avoid reading it until it has been fully synchronized. |
| 453 | // |
| 454 | // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, |
| 455 | // to make it simpler. |
| 456 | return ready_.load(); |
| 457 | } |
| 458 | |
| 459 | // TODO: also track the RequestId ? |
| 460 | |
| 461 | std::atomic<bool> ready_{false}; |
| 462 | |
| 463 | |
| 464 | std::optional<rxcpp::subscriber<RefWrapper>> subscriber_; |
| 465 | }; |
| 466 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 467 | // Convert callback pattern into reactive pattern. |
| 468 | struct JobScheduledEventSubject { |
| 469 | JobScheduledEventSubject() {} |
| 470 | |
| 471 | void Subscribe(rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>> subscriber) { |
| 472 | DCHECK(ready_ != true) << "Cannot Subscribe twice"; |
| 473 | |
| 474 | subscriber_ = std::move(subscriber); |
| 475 | |
| 476 | // Release edge of synchronizes-with AcquireIsReady. |
| 477 | ready_.store(true); |
| 478 | } |
| 479 | |
| 480 | void OnNext(RequestId request_id, JobScheduledEvent e) { |
| 481 | if (!AcquireIsReady()) { |
| 482 | return; |
| 483 | } |
| 484 | |
| 485 | if (!subscriber_->is_subscribed()) { |
| 486 | return; |
| 487 | } |
| 488 | |
| 489 | subscriber_->on_next(std::pair<RequestId, JobScheduledEvent>{std::move(request_id), std::move(e)}); |
| 490 | |
| 491 | } |
| 492 | |
| 493 | void OnCompleted() { |
| 494 | if (!AcquireIsReady()) { |
| 495 | return; |
| 496 | } |
| 497 | |
| 498 | subscriber_->on_completed(); |
| 499 | } |
| 500 | |
| 501 | private: |
| 502 | bool AcquireIsReady() { |
| 503 | // Synchronizes-with the release-edge in Subscribe. |
| 504 | // This can happen much later, only once the subscription actually happens. |
| 505 | |
| 506 | // However, as far as I know, 'rxcpp::subscriber' is not thread safe, |
| 507 | // (but the observable chain itself can be made thread-safe via #observe_on, etc). |
| 508 | // so we must avoid reading it until it has been fully synchronized. |
| 509 | // |
| 510 | // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, |
| 511 | // to make it simpler. |
| 512 | return ready_.load(); |
| 513 | } |
| 514 | |
| 515 | // TODO: also track the RequestId ? |
| 516 | |
| 517 | std::atomic<bool> ready_{false}; |
| 518 | |
| 519 | std::optional<rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>>> subscriber_; |
| 520 | }; |
| 521 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 522 | class EventManager::Impl { |
| 523 | public: |
| 524 | Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory) |
| 525 | : perfetto_factory_(perfetto_factory), |
| 526 | worker_thread_(rxcpp::observe_on_new_thread()), |
| 527 | worker_thread2_(rxcpp::observe_on_new_thread()), |
| 528 | io_thread_(perfetto::ObserveOnNewIoThread()) { |
| 529 | |
| 530 | // TODO: read all properties from one config class. |
| 531 | tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false); |
| 532 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 533 | rx_lifetime_ = InitializeRxGraph(); |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 534 | rx_lifetime_jobs_ = InitializeRxGraphForJobScheduledEvents(); |
| 535 | } |
| 536 | |
| 537 | void SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) { |
| 538 | DCHECK(callbacks_.expired()); |
| 539 | callbacks_ = callbacks; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 540 | } |
| 541 | |
| 542 | bool OnAppLaunchEvent(RequestId request_id, |
| 543 | const AppLaunchEvent& event) { |
| 544 | LOG(VERBOSE) << "EventManager::OnAppLaunchEvent(" |
| 545 | << "request_id=" << request_id.request_id << "," |
| 546 | << event; |
| 547 | |
| 548 | app_launch_event_subject_.OnNext(event); |
| 549 | |
| 550 | return true; |
| 551 | } |
| 552 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 553 | bool OnJobScheduledEvent(RequestId request_id, |
| 554 | const JobScheduledEvent& event) { |
| 555 | LOG(VERBOSE) << "EventManager::OnJobScheduledEvent(" |
| 556 | << "request_id=" << request_id.request_id << ",event=TODO)."; |
| 557 | |
| 558 | job_scheduled_event_subject_.OnNext(std::move(request_id), event); |
| 559 | |
| 560 | return true; // No errors. |
| 561 | } |
| 562 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 563 | rxcpp::composite_subscription InitializeRxGraph() { |
| 564 | LOG(VERBOSE) << "EventManager::InitializeRxGraph"; |
| 565 | |
| 566 | app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>( |
| 567 | [&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) { |
| 568 | app_launch_event_subject_.Subscribe(std::move(subscriber)); |
| 569 | }); |
| 570 | |
| 571 | rxcpp::composite_subscription lifetime; |
| 572 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame^] | 573 | if (!tracing_allowed_) { |
| 574 | LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false"; |
| 575 | } |
| 576 | |
| 577 | AppLaunchEventState initial_state{&perfetto_factory_, |
| 578 | tracing_allowed_, |
| 579 | &worker_thread2_, |
| 580 | &io_thread_}; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 581 | app_launch_events_ |
| 582 | .subscribe_on(worker_thread_) |
| 583 | .scan(std::move(initial_state), |
| 584 | [](AppLaunchEventState state, AppLaunchEventRefWrapper event) { |
| 585 | state.OnNewEvent(event.get()); |
| 586 | return state; |
| 587 | }) |
| 588 | .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) { |
| 589 | // Intentionally left blank. |
| 590 | (void)state; |
| 591 | }); |
| 592 | |
| 593 | return lifetime; |
| 594 | } |
| 595 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 596 | rxcpp::composite_subscription InitializeRxGraphForJobScheduledEvents() { |
| 597 | LOG(VERBOSE) << "EventManager::InitializeRxGraphForJobScheduledEvents"; |
| 598 | |
| 599 | using RequestAndJobEvent = std::pair<RequestId, JobScheduledEvent>; |
| 600 | |
| 601 | job_scheduled_events_ = rxcpp::observable<>::create<RequestAndJobEvent>( |
| 602 | [&](rxcpp::subscriber<RequestAndJobEvent> subscriber) { |
| 603 | job_scheduled_event_subject_.Subscribe(std::move(subscriber)); |
| 604 | }); |
| 605 | |
| 606 | rxcpp::composite_subscription lifetime; |
| 607 | |
| 608 | job_scheduled_events_ |
| 609 | .observe_on(worker_thread_) // async handling. |
| 610 | .tap([this](const RequestAndJobEvent& e) { |
| 611 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(1) - job begins"; |
| 612 | this->NotifyProgress(e.first, TaskResult{TaskResult::State::kBegan}); |
| 613 | |
| 614 | // TODO: probably this shouldn't be emitted until most of the usual DCHECKs |
| 615 | // (for example, validate a job isn't already started, the request is not reused, etc). |
| 616 | // In this way we could block from the client until it sees 'kBegan' and Log.wtf otherwise. |
| 617 | }) |
| 618 | .tap([](const RequestAndJobEvent& e) { |
| 619 | // TODO. Actual work. |
| 620 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(2) - job is being processed"; |
| 621 | |
| 622 | // TODO: abort functionality for in-flight jobs. |
| 623 | // |
| 624 | // maybe something like scan that returns an observable<Job> + flat map to that job. |
| 625 | // then we could unsubscribe from the scan to do a partial abort? need to try it and see if it works. |
| 626 | // |
| 627 | // other option is to create a new outer subscription for each job id which seems less ideal. |
| 628 | }) |
| 629 | .subscribe(/*out*/lifetime, |
| 630 | /*on_next*/ |
| 631 | [this](const RequestAndJobEvent& e) { |
| 632 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#subscribe - job completed"; |
| 633 | this->NotifyComplete(e.first, TaskResult{TaskResult::State::kCompleted}); |
| 634 | } |
| 635 | #if 0 |
| 636 | , |
| 637 | /*on_error*/ |
| 638 | [](rxcpp::util::error_ptr err) { |
| 639 | LOG(ERROR) << "Scheduled job event failed: " << rxcpp::util::what(err); |
| 640 | |
| 641 | //std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 642 | //if (callbacks != nullptr) { |
| 643 | // FIXME: How do we get the request ID back out of the error? Seems like a problem. |
| 644 | // callbacks->OnComplete(, TaskResult{TaskResult::kError}); |
| 645 | // We may have to wrap with an iorap::expected instead of using on_error. |
| 646 | //} |
| 647 | |
| 648 | // FIXME: need to add a 'OnErrorResumeNext' operator? |
| 649 | DCHECK(false) << "forgot to implement OnErrorResumeNext"; |
| 650 | } |
| 651 | #endif |
| 652 | ); |
| 653 | |
| 654 | // TODO: error output should happen via an observable. |
| 655 | |
| 656 | return lifetime; |
| 657 | } |
| 658 | |
| 659 | void NotifyComplete(RequestId request_id, TaskResult result) { |
| 660 | std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 661 | if (callbacks != nullptr) { |
| 662 | callbacks->OnComplete(std::move(request_id), std::move(result)); |
| 663 | } else { |
| 664 | LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; |
| 665 | } |
| 666 | } |
| 667 | |
| 668 | void NotifyProgress(RequestId request_id, TaskResult result) { |
| 669 | std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 670 | if (callbacks != nullptr) { |
| 671 | callbacks->OnProgress(std::move(request_id), std::move(result)); |
| 672 | } else { |
| 673 | LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; |
| 674 | } |
| 675 | } |
| 676 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 677 | perfetto::RxProducerFactory& perfetto_factory_; |
| 678 | bool tracing_allowed_{true}; |
| 679 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 680 | std::weak_ptr<TaskResultCallbacks> callbacks_; // avoid cycles with weakptr. |
| 681 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 682 | using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper; |
| 683 | rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_; |
| 684 | AppLaunchEventSubject app_launch_event_subject_; |
| 685 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 686 | rxcpp::observable<std::pair<RequestId, JobScheduledEvent>> job_scheduled_events_; |
| 687 | JobScheduledEventSubject job_scheduled_event_subject_; |
| 688 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 689 | rxcpp::observable<RequestId> completed_requests_; |
| 690 | |
| 691 | // regular-priority thread to handle binder callbacks. |
| 692 | observe_on_one_worker worker_thread_; |
| 693 | observe_on_one_worker worker_thread2_; |
| 694 | // low priority idle-class thread for IO operations. |
| 695 | observe_on_one_worker io_thread_; |
| 696 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 697 | rxcpp::composite_subscription rx_lifetime_; // app launch events |
| 698 | rxcpp::composite_subscription rx_lifetime_jobs_; // job scheduled events |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 699 | |
| 700 | //INTENTIONAL_COMPILER_ERROR_HERE: |
| 701 | // FIXME: |
| 702 | // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function |
| 703 | // that the main thread can call. This would subscribe on all the observables we internally |
| 704 | // have here (probably on an event-manager-dedicated thread for simplicity). |
| 705 | // |
| 706 | // ideally we'd just reuse the binder thread to handle the events but I'm not super sure, |
| 707 | // maybe this already works with the identity_current_thread coordination? |
| 708 | }; |
| 709 | using Impl = EventManager::Impl; |
| 710 | |
| 711 | EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory) |
| 712 | : impl_(new Impl(perfetto_factory)) {} |
| 713 | |
| 714 | std::shared_ptr<EventManager> EventManager::Create() { |
| 715 | static perfetto::PerfettoDependencies::Injector injector{ |
| 716 | perfetto::PerfettoDependencies::CreateComponent |
| 717 | }; |
| 718 | static perfetto::RxProducerFactory producer_factory{ |
| 719 | /*borrow*/injector |
| 720 | }; |
| 721 | return EventManager::Create(/*borrow*/producer_factory); |
| 722 | } |
| 723 | |
| 724 | std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) { |
| 725 | std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}}; |
| 726 | return p; |
| 727 | } |
| 728 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 729 | void EventManager::SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) { |
| 730 | return impl_->SetTaskResultCallbacks(std::move(callbacks)); |
| 731 | } |
| 732 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 733 | bool EventManager::OnAppLaunchEvent(RequestId request_id, |
| 734 | const AppLaunchEvent& event) { |
| 735 | return impl_->OnAppLaunchEvent(request_id, event); |
| 736 | } |
| 737 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 738 | bool EventManager::OnJobScheduledEvent(RequestId request_id, |
| 739 | const JobScheduledEvent& event) { |
| 740 | return impl_->OnJobScheduledEvent(request_id, event); |
| 741 | } |
| 742 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 743 | } // namespace iorap::manager |