Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2019 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "common/debug.h" |
| 18 | #include "common/expected.h" |
| 19 | #include "manager/event_manager.h" |
| 20 | #include "perfetto/rx_producer.h" |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 21 | #include "prefetcher/read_ahead.h" |
| 22 | #include "prefetcher/task_id.h" |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 23 | |
| 24 | #include <android-base/properties.h> |
| 25 | #include <rxcpp/rx.hpp> |
| 26 | |
| 27 | #include <atomic> |
| 28 | #include <functional> |
| 29 | |
| 30 | using rxcpp::observe_on_one_worker; |
| 31 | |
| 32 | namespace iorap::manager { |
| 33 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 34 | using binder::AppLaunchEvent; |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 35 | using binder::JobScheduledEvent; |
| 36 | using binder::RequestId; |
| 37 | using binder::TaskResult; |
| 38 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 39 | using perfetto::PerfettoStreamCommand; |
| 40 | using perfetto::PerfettoTraceProto; |
| 41 | |
| 42 | struct AppComponentName { |
| 43 | std::string package; |
| 44 | std::string activity_name; |
| 45 | |
| 46 | static bool HasAppComponentName(const std::string& s) { |
| 47 | return s.find('/') != std::string::npos; |
| 48 | } |
| 49 | |
| 50 | // "com.foo.bar/.A" -> {"com.foo.bar", ".A"} |
| 51 | static AppComponentName FromString(const std::string& s) { |
| 52 | constexpr const char delimiter = '/'; |
| 53 | std::string package = s.substr(0, delimiter); |
| 54 | |
| 55 | std::string activity_name = s; |
| 56 | activity_name.erase(0, s.find(delimiter) + sizeof(delimiter)); |
| 57 | |
| 58 | return {std::move(package), std::move(activity_name)}; |
| 59 | } |
| 60 | |
| 61 | // {"com.foo.bar", ".A"} -> "com.foo.bar/.A" |
| 62 | std::string ToString() const { |
| 63 | return package + "/" + activity_name; |
| 64 | } |
| 65 | |
| 66 | /* |
| 67 | * '/' is encoded into %2F |
| 68 | * '%' is encoded into %25 |
| 69 | * |
| 70 | * This allows the component name to be be used as a file name |
| 71 | * ('/' is illegal due to being a path separator) with minimal |
| 72 | * munging. |
| 73 | */ |
| 74 | |
| 75 | // "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"} |
| 76 | static AppComponentName FromUrlEncodedString(const std::string& s) { |
| 77 | std::string cpy = s; |
| 78 | Replace(cpy, "%2F", "/"); |
| 79 | Replace(cpy, "%25", "%"); |
| 80 | |
| 81 | return FromString(cpy); |
| 82 | } |
| 83 | |
| 84 | // {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25" |
| 85 | std::string ToUrlEncodedString() const { |
| 86 | std::string s = ToString(); |
| 87 | Replace(s, "%", "%25"); |
| 88 | Replace(s, "/", "%2F"); |
| 89 | return s; |
| 90 | } |
| 91 | |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 92 | /* |
| 93 | * '/' is encoded into @@ |
| 94 | * '%' is encoded into ^^ |
| 95 | * |
| 96 | * Two purpose: |
Yan Wang | 2fa45ca | 2019-08-28 01:45:54 +0000 | [diff] [blame^] | 97 | * 1. This allows the component name to be used as a file name |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 98 | * ('/' is illegal due to being a path separator) with minimal |
| 99 | * munging. |
Yan Wang | 2fa45ca | 2019-08-28 01:45:54 +0000 | [diff] [blame^] | 100 | * 2. This allows the component name to be used in .mk file because |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 101 | * '%' is a special char and cannot be easily escaped in Makefile. |
| 102 | * |
Yan Wang | 2fa45ca | 2019-08-28 01:45:54 +0000 | [diff] [blame^] | 103 | * This is a workround for test purpose. |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 104 | * Hopefully, the double "@@" and "^^" are not used in other cases. |
| 105 | */ |
Yan Wang | 2fa45ca | 2019-08-28 01:45:54 +0000 | [diff] [blame^] | 106 | |
| 107 | // "com.foo.bar@@.A^^25" -> {"com.foo.bar", ".A%"} |
| 108 | static AppComponentName FromMakFileSafeEncodedString(const std::string& s) { |
| 109 | std::string cpy = s; |
| 110 | Replace(cpy, "@@", "/"); |
| 111 | Replace(cpy, "^^", "%"); |
| 112 | |
| 113 | return FromString(cpy); |
| 114 | } |
| 115 | |
| 116 | // {"com.foo.bar", ".A%"} -> "com.foo.bar@@.A^^" |
| 117 | std::string ToMakeFileSafeEncodedString() const { |
| 118 | std::string s = ToString(); |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 119 | Replace(s, "/", "@@"); |
| 120 | Replace(s, "%", "^^"); |
| 121 | return s; |
| 122 | } |
| 123 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 124 | private: |
| 125 | static bool Replace(std::string& str, const std::string& from, const std::string& to) { |
| 126 | // TODO: call in a loop to replace all occurrences, not just the first one. |
| 127 | const size_t start_pos = str.find(from); |
| 128 | if (start_pos == std::string::npos) { |
| 129 | return false; |
| 130 | } |
| 131 | |
| 132 | str.replace(start_pos, from.length(), to); |
| 133 | |
| 134 | return true; |
| 135 | } |
| 136 | }; |
| 137 | |
| 138 | std::ostream& operator<<(std::ostream& os, const AppComponentName& name) { |
| 139 | os << name.ToString(); |
| 140 | return os; |
| 141 | } |
| 142 | |
| 143 | // Main logic of the #OnAppLaunchEvent scan method. |
| 144 | // |
| 145 | // All functions are called from the same thread as the event manager |
| 146 | // functions. |
| 147 | // |
| 148 | // This is a data type, it's moved (std::move) around from one iteration |
| 149 | // of #scan to another. |
| 150 | struct AppLaunchEventState { |
| 151 | std::optional<AppComponentName> component_name_; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 152 | // Sequence ID is shared amongst the same app launch sequence, |
| 153 | // but changes whenever a new app launch sequence begins. |
| 154 | size_t sequence_id_ = static_cast<size_t>(-1); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 155 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 156 | prefetcher::ReadAhead read_ahead_; |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 157 | bool allowed_readahead_{true}; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 158 | bool is_read_ahead_{false}; |
| 159 | std::optional<prefetcher::TaskId> read_ahead_task_; |
| 160 | |
| 161 | bool allowed_tracing_{true}; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 162 | bool is_tracing_{false}; |
| 163 | std::optional<rxcpp::composite_subscription> rx_lifetime_; |
| 164 | std::vector<rxcpp::composite_subscription> rx_in_flight_; |
| 165 | |
| 166 | borrowed<perfetto::RxProducerFactory*> perfetto_factory_; // not null |
| 167 | borrowed<observe_on_one_worker*> thread_; // not null |
| 168 | borrowed<observe_on_one_worker*> io_thread_; // not null |
| 169 | |
| 170 | explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory, |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 171 | bool allowed_readahead, |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 172 | bool allowed_tracing, |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 173 | borrowed<observe_on_one_worker*> thread, |
| 174 | borrowed<observe_on_one_worker*> io_thread) { |
| 175 | perfetto_factory_ = perfetto_factory; |
| 176 | DCHECK(perfetto_factory_ != nullptr); |
| 177 | |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 178 | allowed_readahead_ = allowed_readahead; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 179 | allowed_tracing_ = allowed_tracing; |
| 180 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 181 | thread_ = thread; |
| 182 | DCHECK(thread_ != nullptr); |
| 183 | |
| 184 | io_thread_ = io_thread; |
| 185 | DCHECK(io_thread_ != nullptr); |
| 186 | } |
| 187 | |
| 188 | // Updates the values in this struct only as a side effect. |
| 189 | // |
| 190 | // May create and fire a new rx chain on the same threads as passed |
| 191 | // in by the constructors. |
| 192 | void OnNewEvent(const AppLaunchEvent& event) { |
| 193 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event; |
| 194 | |
| 195 | using Type = AppLaunchEvent::Type; |
| 196 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 197 | DCHECK_GE(event.sequence_id, 0); |
| 198 | sequence_id_ = static_cast<size_t>(event.sequence_id); |
| 199 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 200 | switch (event.type) { |
| 201 | case Type::kIntentStarted: { |
| 202 | DCHECK(!IsTracing()); |
| 203 | // Optimistically start tracing if we have the activity in the intent. |
| 204 | if (!event.intent_proto->has_component()) { |
| 205 | // Can't do anything if there is no component in the proto. |
| 206 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace"; |
| 207 | break; |
| 208 | } |
| 209 | |
| 210 | const std::string& package_name = event.intent_proto->component().package_name(); |
| 211 | const std::string& class_name = event.intent_proto->component().class_name(); |
| 212 | AppComponentName component_name{package_name, class_name}; |
| 213 | |
| 214 | component_name_ = component_name; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 215 | |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 216 | if (allowed_readahead_) { |
| 217 | StartReadAhead(sequence_id_, component_name); |
| 218 | } |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 219 | if (allowed_tracing_) { |
| 220 | rx_lifetime_ = StartTracing(std::move(component_name)); |
| 221 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 222 | |
| 223 | break; |
| 224 | } |
| 225 | case Type::kIntentFailed: |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 226 | if (allowed_tracing_) { |
| 227 | AbortTrace(); |
| 228 | } |
| 229 | AbortReadAhead(); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 230 | break; |
| 231 | case Type::kActivityLaunched: { |
| 232 | // Cancel tracing for warm/hot. |
| 233 | // Restart tracing if the activity was unexpected. |
| 234 | |
| 235 | AppLaunchEvent::Temperature temperature = event.temperature; |
| 236 | if (temperature != AppLaunchEvent::Temperature::kCold) { |
| 237 | LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 238 | |
| 239 | if (allowed_tracing_) { |
| 240 | AbortTrace(); |
| 241 | } |
| 242 | AbortReadAhead(); |
| 243 | } else if (!IsTracing() || !IsReadAhead()) { // and the temperature is Cold. |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 244 | // Start late trace when intent didn't have a component name |
| 245 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace"; |
| 246 | |
| 247 | const std::string& title = event.activity_record_proto->identifier().title(); |
| 248 | if (!AppComponentName::HasAppComponentName(title)) { |
| 249 | // Proto comment claim this is sometimes a window title. |
| 250 | // We need the actual 'package/component' here, so just ignore it if it's a title. |
| 251 | LOG(WARNING) << "App launched without a component name: " << event; |
| 252 | break; |
| 253 | } |
| 254 | |
| 255 | AppComponentName component_name = AppComponentName::FromString(title); |
| 256 | |
| 257 | component_name_ = component_name; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 258 | |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 259 | if (allowed_readahead_ && !IsReadAhead()) { |
| 260 | StartReadAhead(sequence_id_, component_name); |
| 261 | } |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 262 | if (allowed_tracing_ && !IsTracing()) { |
| 263 | rx_lifetime_ = StartTracing(std::move(component_name)); |
| 264 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 265 | } else { |
| 266 | // FIXME: match actual component name against intent component name. |
| 267 | // abort traces if they don't match. |
| 268 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 269 | if (allowed_tracing_) { |
| 270 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing"; |
| 271 | } |
| 272 | LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already doing readahead"; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 273 | } |
| 274 | break; |
| 275 | } |
| 276 | case Type::kActivityLaunchFinished: |
| 277 | // Finish tracing and collect trace buffer. |
| 278 | // |
| 279 | // TODO: this happens automatically when perfetto finishes its |
| 280 | // trace duration. |
| 281 | if (IsTracing()) { |
| 282 | MarkPendingTrace(); |
| 283 | } |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 284 | if (IsReadAhead()) { |
| 285 | FinishReadAhead(); |
| 286 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 287 | break; |
| 288 | case Type::kActivityLaunchCancelled: |
| 289 | // Abort tracing. |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 290 | if (allowed_tracing_) { |
| 291 | AbortTrace(); |
| 292 | } |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 293 | if (IsReadAhead()) { |
| 294 | AbortReadAhead(); |
| 295 | } |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 296 | break; |
| 297 | default: |
| 298 | DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this. |
| 299 | LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this. |
| 300 | } |
| 301 | } |
| 302 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 303 | // Is there an in-flight readahead task currently? |
| 304 | bool IsReadAhead() const { |
| 305 | return read_ahead_task_.has_value(); |
| 306 | } |
| 307 | |
| 308 | void StartReadAhead(size_t id, const AppComponentName& component_name) { |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 309 | DCHECK(allowed_readahead_); |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 310 | DCHECK(!IsReadAhead()); |
| 311 | |
Yan Wang | cb4cc8c | 2019-08-19 12:34:06 -0700 | [diff] [blame] | 312 | // This is changed from "/data/misc/iorapd/" for testing purpose. |
| 313 | // Should be restored b/139831359. |
| 314 | std::string file_path = "/product/iorap-trace/"; |
Yan Wang | 2fa45ca | 2019-08-28 01:45:54 +0000 | [diff] [blame^] | 315 | file_path += component_name.ToMakeFileSafeEncodedString(); |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 316 | file_path += ".compiled_trace.pb"; |
| 317 | |
| 318 | prefetcher::TaskId task{id, std::move(file_path)}; |
| 319 | read_ahead_.BeginTask(task); |
| 320 | // TODO: non-void return signature? |
| 321 | |
| 322 | read_ahead_task_ = std::move(task); |
| 323 | } |
| 324 | |
| 325 | void FinishReadAhead() { |
| 326 | DCHECK(IsReadAhead()); |
| 327 | |
| 328 | read_ahead_.FinishTask(*read_ahead_task_); |
| 329 | read_ahead_task_ = std::nullopt; |
| 330 | } |
| 331 | |
| 332 | void AbortReadAhead() { |
| 333 | FinishReadAhead(); |
| 334 | } |
| 335 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 336 | bool IsTracing() const { |
| 337 | return is_tracing_; |
| 338 | } |
| 339 | |
| 340 | rxcpp::composite_subscription StartTracing(AppComponentName component_name) { |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 341 | DCHECK(allowed_tracing_); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 342 | DCHECK(!IsTracing()); |
| 343 | |
| 344 | auto /*observable<PerfettoStreamCommand>*/ perfetto_commands = |
| 345 | rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing) |
| 346 | // wait 1x |
| 347 | .concat( |
| 348 | // Pick a value longer than the perfetto config delay_ms, so that we send |
| 349 | // 'kShutdown' after tracing has already finished. |
| 350 | rxcpp::observable<>::interval(std::chrono::milliseconds(10000)) |
| 351 | .take(2) // kStopTracing, kShutdown. |
| 352 | .map([](int value) { |
| 353 | // value is 1,2,3,... |
| 354 | return static_cast<PerfettoStreamCommand>(value); // 1,2, ... |
| 355 | }) |
| 356 | ); |
| 357 | |
| 358 | auto /*observable<PerfettoTraceProto>*/ trace_proto_stream = |
| 359 | perfetto_factory_->CreateTraceStream(perfetto_commands); |
| 360 | // This immediately connects to perfetto asynchronously. |
| 361 | // |
| 362 | // TODO: create a perfetto handle earlier, to minimize perfetto startup latency. |
| 363 | |
| 364 | rxcpp::composite_subscription lifetime; |
| 365 | |
| 366 | trace_proto_stream |
| 367 | .tap([](const PerfettoTraceProto& trace_proto) { |
| 368 | LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)"; |
| 369 | }) |
| 370 | .observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. |
| 371 | .subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. |
| 372 | .observe_on(*io_thread_) // Write data on an idle-class-priority thread. |
| 373 | .tap([](const PerfettoTraceProto& trace_proto) { |
| 374 | LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)"; |
| 375 | }) |
| 376 | .as_blocking() // TODO: remove. |
| 377 | .subscribe(/*out*/lifetime, |
| 378 | /*on_next*/[component_name] |
| 379 | (PerfettoTraceProto trace_proto) { |
| 380 | std::string file_path = "/data/misc/iorapd/"; |
| 381 | file_path += component_name.ToUrlEncodedString(); |
| 382 | file_path += ".perfetto_trace.pb"; |
| 383 | |
| 384 | // TODO: timestamp each file into a subdirectory. |
| 385 | |
| 386 | if (!trace_proto.WriteFullyToFile(file_path)) { |
| 387 | LOG(ERROR) << "Failed to save TraceBuffer to " << file_path; |
| 388 | } else { |
| 389 | LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path; |
| 390 | } |
| 391 | }, |
| 392 | /*on_error*/[](rxcpp::util::error_ptr err) { |
| 393 | LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err); |
| 394 | }); |
| 395 | |
| 396 | is_tracing_ = true; |
| 397 | |
| 398 | return lifetime; |
| 399 | } |
| 400 | |
| 401 | void AbortTrace() { |
| 402 | LOG(VERBOSE) << "AppLaunchEventState - AbortTrace"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 403 | DCHECK(allowed_tracing_); |
| 404 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 405 | is_tracing_ = false; |
| 406 | if (rx_lifetime_) { |
| 407 | // TODO: it would be good to call perfetto Destroy. |
| 408 | |
| 409 | LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe"; |
| 410 | rx_lifetime_->unsubscribe(); |
| 411 | rx_lifetime_.reset(); |
| 412 | } |
| 413 | } |
| 414 | |
| 415 | void MarkPendingTrace() { |
| 416 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace"; |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 417 | DCHECK(allowed_tracing_); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 418 | DCHECK(is_tracing_); |
| 419 | DCHECK(rx_lifetime_.has_value()); |
| 420 | |
| 421 | if (rx_lifetime_) { |
| 422 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved"; |
| 423 | // Don't unsubscribe because that would cause the perfetto TraceBuffer |
| 424 | // to get dropped on the floor. |
| 425 | // |
| 426 | // Instead, we want to let it finish and write it out to a file. |
| 427 | rx_in_flight_.push_back(*std::move(rx_lifetime_)); |
| 428 | rx_lifetime_.reset(); |
| 429 | } else { |
| 430 | LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty"; |
| 431 | } |
| 432 | |
| 433 | // FIXME: how do we clear this vector? |
| 434 | } |
| 435 | }; |
| 436 | |
| 437 | // Convert callback pattern into reactive pattern. |
| 438 | struct AppLaunchEventSubject { |
| 439 | using RefWrapper = |
| 440 | std::reference_wrapper<const AppLaunchEvent>; |
| 441 | |
| 442 | AppLaunchEventSubject() {} |
| 443 | |
| 444 | void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) { |
| 445 | DCHECK(ready_ != true) << "Cannot Subscribe twice"; |
| 446 | |
| 447 | subscriber_ = std::move(subscriber); |
| 448 | |
| 449 | // Release edge of synchronizes-with AcquireIsReady. |
| 450 | ready_.store(true); |
| 451 | } |
| 452 | |
| 453 | void OnNext(const AppLaunchEvent& e) { |
| 454 | if (!AcquireIsReady()) { |
| 455 | return; |
| 456 | } |
| 457 | |
| 458 | if (!subscriber_->is_subscribed()) { |
| 459 | return; |
| 460 | } |
| 461 | |
| 462 | /* |
| 463 | * TODO: fix upstream. |
| 464 | * |
| 465 | * Rx workaround: this fails to compile when |
| 466 | * the observable is a reference type: |
| 467 | * |
| 468 | * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const' |
| 469 | * virtual void on_next(T&&) const {}; |
| 470 | * |
| 471 | * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here |
| 472 | * virtual void on_next(T&) const {}; |
| 473 | * |
| 474 | * (The workaround is to use reference_wrapper instead |
| 475 | * of const AppLaunchEvent&) |
| 476 | */ |
| 477 | subscriber_->on_next(std::cref(e)); |
| 478 | |
| 479 | } |
| 480 | |
| 481 | void OnCompleted() { |
| 482 | if (!AcquireIsReady()) { |
| 483 | return; |
| 484 | } |
| 485 | |
| 486 | subscriber_->on_completed(); |
| 487 | } |
| 488 | |
| 489 | private: |
| 490 | bool AcquireIsReady() { |
| 491 | // Synchronizes-with the release-edge in Subscribe. |
| 492 | // This can happen much later, only once the subscription actually happens. |
| 493 | |
| 494 | // However, as far as I know, 'rxcpp::subscriber' is not thread safe, |
| 495 | // (but the observable chain itself can be made thread-safe via #observe_on, etc). |
| 496 | // so we must avoid reading it until it has been fully synchronized. |
| 497 | // |
| 498 | // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, |
| 499 | // to make it simpler. |
| 500 | return ready_.load(); |
| 501 | } |
| 502 | |
| 503 | // TODO: also track the RequestId ? |
| 504 | |
| 505 | std::atomic<bool> ready_{false}; |
| 506 | |
| 507 | |
| 508 | std::optional<rxcpp::subscriber<RefWrapper>> subscriber_; |
| 509 | }; |
| 510 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 511 | // Convert callback pattern into reactive pattern. |
| 512 | struct JobScheduledEventSubject { |
| 513 | JobScheduledEventSubject() {} |
| 514 | |
| 515 | void Subscribe(rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>> subscriber) { |
| 516 | DCHECK(ready_ != true) << "Cannot Subscribe twice"; |
| 517 | |
| 518 | subscriber_ = std::move(subscriber); |
| 519 | |
| 520 | // Release edge of synchronizes-with AcquireIsReady. |
| 521 | ready_.store(true); |
| 522 | } |
| 523 | |
| 524 | void OnNext(RequestId request_id, JobScheduledEvent e) { |
| 525 | if (!AcquireIsReady()) { |
| 526 | return; |
| 527 | } |
| 528 | |
| 529 | if (!subscriber_->is_subscribed()) { |
| 530 | return; |
| 531 | } |
| 532 | |
| 533 | subscriber_->on_next(std::pair<RequestId, JobScheduledEvent>{std::move(request_id), std::move(e)}); |
| 534 | |
| 535 | } |
| 536 | |
| 537 | void OnCompleted() { |
| 538 | if (!AcquireIsReady()) { |
| 539 | return; |
| 540 | } |
| 541 | |
| 542 | subscriber_->on_completed(); |
| 543 | } |
| 544 | |
| 545 | private: |
| 546 | bool AcquireIsReady() { |
| 547 | // Synchronizes-with the release-edge in Subscribe. |
| 548 | // This can happen much later, only once the subscription actually happens. |
| 549 | |
| 550 | // However, as far as I know, 'rxcpp::subscriber' is not thread safe, |
| 551 | // (but the observable chain itself can be made thread-safe via #observe_on, etc). |
| 552 | // so we must avoid reading it until it has been fully synchronized. |
| 553 | // |
| 554 | // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, |
| 555 | // to make it simpler. |
| 556 | return ready_.load(); |
| 557 | } |
| 558 | |
| 559 | // TODO: also track the RequestId ? |
| 560 | |
| 561 | std::atomic<bool> ready_{false}; |
| 562 | |
| 563 | std::optional<rxcpp::subscriber<std::pair<RequestId, JobScheduledEvent>>> subscriber_; |
| 564 | }; |
| 565 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 566 | class EventManager::Impl { |
| 567 | public: |
| 568 | Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory) |
| 569 | : perfetto_factory_(perfetto_factory), |
| 570 | worker_thread_(rxcpp::observe_on_new_thread()), |
| 571 | worker_thread2_(rxcpp::observe_on_new_thread()), |
| 572 | io_thread_(perfetto::ObserveOnNewIoThread()) { |
| 573 | |
| 574 | // TODO: read all properties from one config class. |
| 575 | tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false); |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 576 | readahead_allowed_ = ::android::base::GetBoolProperty("iorapd.readahead.enable", |
| 577 | /*default*/false); |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 578 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 579 | rx_lifetime_ = InitializeRxGraph(); |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 580 | rx_lifetime_jobs_ = InitializeRxGraphForJobScheduledEvents(); |
| 581 | } |
| 582 | |
| 583 | void SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) { |
| 584 | DCHECK(callbacks_.expired()); |
| 585 | callbacks_ = callbacks; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 586 | } |
| 587 | |
| 588 | bool OnAppLaunchEvent(RequestId request_id, |
| 589 | const AppLaunchEvent& event) { |
| 590 | LOG(VERBOSE) << "EventManager::OnAppLaunchEvent(" |
| 591 | << "request_id=" << request_id.request_id << "," |
| 592 | << event; |
| 593 | |
| 594 | app_launch_event_subject_.OnNext(event); |
| 595 | |
| 596 | return true; |
| 597 | } |
| 598 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 599 | bool OnJobScheduledEvent(RequestId request_id, |
| 600 | const JobScheduledEvent& event) { |
| 601 | LOG(VERBOSE) << "EventManager::OnJobScheduledEvent(" |
| 602 | << "request_id=" << request_id.request_id << ",event=TODO)."; |
| 603 | |
| 604 | job_scheduled_event_subject_.OnNext(std::move(request_id), event); |
| 605 | |
| 606 | return true; // No errors. |
| 607 | } |
| 608 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 609 | rxcpp::composite_subscription InitializeRxGraph() { |
| 610 | LOG(VERBOSE) << "EventManager::InitializeRxGraph"; |
| 611 | |
| 612 | app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>( |
| 613 | [&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) { |
| 614 | app_launch_event_subject_.Subscribe(std::move(subscriber)); |
| 615 | }); |
| 616 | |
| 617 | rxcpp::composite_subscription lifetime; |
| 618 | |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 619 | if (!tracing_allowed_) { |
| 620 | LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false"; |
| 621 | } |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 622 | if (!readahead_allowed_) { |
| 623 | LOG(WARNING) << "Readahead disabled by iorapd.readahead.enable=false"; |
| 624 | } |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 625 | |
| 626 | AppLaunchEventState initial_state{&perfetto_factory_, |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 627 | readahead_allowed_, |
Igor Murashkin | f2e0106 | 2019-04-12 15:57:55 -0700 | [diff] [blame] | 628 | tracing_allowed_, |
| 629 | &worker_thread2_, |
| 630 | &io_thread_}; |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 631 | app_launch_events_ |
| 632 | .subscribe_on(worker_thread_) |
| 633 | .scan(std::move(initial_state), |
| 634 | [](AppLaunchEventState state, AppLaunchEventRefWrapper event) { |
| 635 | state.OnNewEvent(event.get()); |
| 636 | return state; |
| 637 | }) |
| 638 | .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) { |
| 639 | // Intentionally left blank. |
| 640 | (void)state; |
| 641 | }); |
| 642 | |
| 643 | return lifetime; |
| 644 | } |
| 645 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 646 | rxcpp::composite_subscription InitializeRxGraphForJobScheduledEvents() { |
| 647 | LOG(VERBOSE) << "EventManager::InitializeRxGraphForJobScheduledEvents"; |
| 648 | |
| 649 | using RequestAndJobEvent = std::pair<RequestId, JobScheduledEvent>; |
| 650 | |
| 651 | job_scheduled_events_ = rxcpp::observable<>::create<RequestAndJobEvent>( |
| 652 | [&](rxcpp::subscriber<RequestAndJobEvent> subscriber) { |
| 653 | job_scheduled_event_subject_.Subscribe(std::move(subscriber)); |
| 654 | }); |
| 655 | |
| 656 | rxcpp::composite_subscription lifetime; |
| 657 | |
| 658 | job_scheduled_events_ |
| 659 | .observe_on(worker_thread_) // async handling. |
| 660 | .tap([this](const RequestAndJobEvent& e) { |
| 661 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(1) - job begins"; |
| 662 | this->NotifyProgress(e.first, TaskResult{TaskResult::State::kBegan}); |
| 663 | |
| 664 | // TODO: probably this shouldn't be emitted until most of the usual DCHECKs |
| 665 | // (for example, validate a job isn't already started, the request is not reused, etc). |
| 666 | // In this way we could block from the client until it sees 'kBegan' and Log.wtf otherwise. |
| 667 | }) |
| 668 | .tap([](const RequestAndJobEvent& e) { |
| 669 | // TODO. Actual work. |
| 670 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(2) - job is being processed"; |
| 671 | |
| 672 | // TODO: abort functionality for in-flight jobs. |
| 673 | // |
| 674 | // maybe something like scan that returns an observable<Job> + flat map to that job. |
| 675 | // then we could unsubscribe from the scan to do a partial abort? need to try it and see if it works. |
| 676 | // |
| 677 | // other option is to create a new outer subscription for each job id which seems less ideal. |
| 678 | }) |
| 679 | .subscribe(/*out*/lifetime, |
| 680 | /*on_next*/ |
| 681 | [this](const RequestAndJobEvent& e) { |
| 682 | LOG(VERBOSE) << "EventManager#JobScheduledEvent#subscribe - job completed"; |
| 683 | this->NotifyComplete(e.first, TaskResult{TaskResult::State::kCompleted}); |
| 684 | } |
| 685 | #if 0 |
| 686 | , |
| 687 | /*on_error*/ |
| 688 | [](rxcpp::util::error_ptr err) { |
| 689 | LOG(ERROR) << "Scheduled job event failed: " << rxcpp::util::what(err); |
| 690 | |
| 691 | //std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 692 | //if (callbacks != nullptr) { |
| 693 | // FIXME: How do we get the request ID back out of the error? Seems like a problem. |
| 694 | // callbacks->OnComplete(, TaskResult{TaskResult::kError}); |
| 695 | // We may have to wrap with an iorap::expected instead of using on_error. |
| 696 | //} |
| 697 | |
| 698 | // FIXME: need to add a 'OnErrorResumeNext' operator? |
| 699 | DCHECK(false) << "forgot to implement OnErrorResumeNext"; |
| 700 | } |
| 701 | #endif |
| 702 | ); |
| 703 | |
| 704 | // TODO: error output should happen via an observable. |
| 705 | |
| 706 | return lifetime; |
| 707 | } |
| 708 | |
| 709 | void NotifyComplete(RequestId request_id, TaskResult result) { |
| 710 | std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 711 | if (callbacks != nullptr) { |
| 712 | callbacks->OnComplete(std::move(request_id), std::move(result)); |
| 713 | } else { |
| 714 | LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; |
| 715 | } |
| 716 | } |
| 717 | |
| 718 | void NotifyProgress(RequestId request_id, TaskResult result) { |
| 719 | std::shared_ptr<TaskResultCallbacks> callbacks = callbacks_.lock(); |
| 720 | if (callbacks != nullptr) { |
| 721 | callbacks->OnProgress(std::move(request_id), std::move(result)); |
| 722 | } else { |
| 723 | LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; |
| 724 | } |
| 725 | } |
| 726 | |
Igor Murashkin | a7bc40a | 2019-06-10 16:04:56 -0700 | [diff] [blame] | 727 | bool readahead_allowed_{true}; |
| 728 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 729 | perfetto::RxProducerFactory& perfetto_factory_; |
| 730 | bool tracing_allowed_{true}; |
| 731 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 732 | std::weak_ptr<TaskResultCallbacks> callbacks_; // avoid cycles with weakptr. |
| 733 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 734 | using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper; |
| 735 | rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_; |
| 736 | AppLaunchEventSubject app_launch_event_subject_; |
| 737 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 738 | rxcpp::observable<std::pair<RequestId, JobScheduledEvent>> job_scheduled_events_; |
| 739 | JobScheduledEventSubject job_scheduled_event_subject_; |
| 740 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 741 | rxcpp::observable<RequestId> completed_requests_; |
| 742 | |
| 743 | // regular-priority thread to handle binder callbacks. |
| 744 | observe_on_one_worker worker_thread_; |
| 745 | observe_on_one_worker worker_thread2_; |
| 746 | // low priority idle-class thread for IO operations. |
| 747 | observe_on_one_worker io_thread_; |
| 748 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 749 | rxcpp::composite_subscription rx_lifetime_; // app launch events |
| 750 | rxcpp::composite_subscription rx_lifetime_jobs_; // job scheduled events |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 751 | |
| 752 | //INTENTIONAL_COMPILER_ERROR_HERE: |
| 753 | // FIXME: |
| 754 | // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function |
| 755 | // that the main thread can call. This would subscribe on all the observables we internally |
| 756 | // have here (probably on an event-manager-dedicated thread for simplicity). |
| 757 | // |
| 758 | // ideally we'd just reuse the binder thread to handle the events but I'm not super sure, |
| 759 | // maybe this already works with the identity_current_thread coordination? |
| 760 | }; |
| 761 | using Impl = EventManager::Impl; |
| 762 | |
| 763 | EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory) |
| 764 | : impl_(new Impl(perfetto_factory)) {} |
| 765 | |
| 766 | std::shared_ptr<EventManager> EventManager::Create() { |
| 767 | static perfetto::PerfettoDependencies::Injector injector{ |
| 768 | perfetto::PerfettoDependencies::CreateComponent |
| 769 | }; |
| 770 | static perfetto::RxProducerFactory producer_factory{ |
| 771 | /*borrow*/injector |
| 772 | }; |
| 773 | return EventManager::Create(/*borrow*/producer_factory); |
| 774 | } |
| 775 | |
| 776 | std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) { |
| 777 | std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}}; |
| 778 | return p; |
| 779 | } |
| 780 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 781 | void EventManager::SetTaskResultCallbacks(std::shared_ptr<TaskResultCallbacks> callbacks) { |
| 782 | return impl_->SetTaskResultCallbacks(std::move(callbacks)); |
| 783 | } |
| 784 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 785 | bool EventManager::OnAppLaunchEvent(RequestId request_id, |
| 786 | const AppLaunchEvent& event) { |
| 787 | return impl_->OnAppLaunchEvent(request_id, event); |
| 788 | } |
| 789 | |
Igor Murashkin | a128a86 | 2019-02-01 13:26:37 -0800 | [diff] [blame] | 790 | bool EventManager::OnJobScheduledEvent(RequestId request_id, |
| 791 | const JobScheduledEvent& event) { |
| 792 | return impl_->OnJobScheduledEvent(request_id, event); |
| 793 | } |
| 794 | |
Igor Murashkin | e54aebc | 2019-01-22 17:58:51 -0800 | [diff] [blame] | 795 | } // namespace iorap::manager |