Tests: Add client library + service stress test

Add initial version of the client library & service stress test.
This tests tries to push the producer & service to its limits
by spawning a configurable number of processes and threads,
each writing events at a customizable rate.

Some work is still left, I still want to tune the configs and
reason on what to expect. Meanwhile checking in the harness.

Change-Id: Iea0b01e52018125e44f42c5b61c687c60315893b
diff --git a/Android.bp b/Android.bp
index 1da2804..37c42cf 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2155,6 +2155,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -2166,6 +2167,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.gen.cc",
     "external/perfetto/protos/perfetto/config/data_source_config.gen.cc",
+    "external/perfetto/protos/perfetto/config/stress_test_config.gen.cc",
     "external/perfetto/protos/perfetto/config/test_config.gen.cc",
     "external/perfetto/protos/perfetto/config/trace_config.gen.cc",
   ],
@@ -2177,6 +2179,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -2188,6 +2191,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.gen.h",
     "external/perfetto/protos/perfetto/config/data_source_config.gen.h",
+    "external/perfetto/protos/perfetto/config/stress_test_config.gen.h",
     "external/perfetto/protos/perfetto/config/test_config.gen.h",
     "external/perfetto/protos/perfetto/config/trace_config.gen.h",
   ],
@@ -2533,6 +2537,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -2543,6 +2548,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.pb.cc",
     "external/perfetto/protos/perfetto/config/data_source_config.pb.cc",
+    "external/perfetto/protos/perfetto/config/stress_test_config.pb.cc",
     "external/perfetto/protos/perfetto/config/test_config.pb.cc",
     "external/perfetto/protos/perfetto/config/trace_config.pb.cc",
   ],
@@ -2554,6 +2560,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -2564,6 +2571,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.pb.h",
     "external/perfetto/protos/perfetto/config/data_source_config.pb.h",
+    "external/perfetto/protos/perfetto/config/stress_test_config.pb.h",
     "external/perfetto/protos/perfetto/config/test_config.pb.h",
     "external/perfetto/protos/perfetto/config/trace_config.pb.h",
   ],
@@ -3133,6 +3141,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -3144,6 +3153,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.pbzero.cc",
     "external/perfetto/protos/perfetto/config/data_source_config.pbzero.cc",
+    "external/perfetto/protos/perfetto/config/stress_test_config.pbzero.cc",
     "external/perfetto/protos/perfetto/config/test_config.pbzero.cc",
     "external/perfetto/protos/perfetto/config/trace_config.pbzero.cc",
   ],
@@ -3155,6 +3165,7 @@
   srcs: [
     "protos/perfetto/config/chrome/chrome_config.proto",
     "protos/perfetto/config/data_source_config.proto",
+    "protos/perfetto/config/stress_test_config.proto",
     "protos/perfetto/config/test_config.proto",
     "protos/perfetto/config/trace_config.proto",
   ],
@@ -3166,6 +3177,7 @@
   out: [
     "external/perfetto/protos/perfetto/config/chrome/chrome_config.pbzero.h",
     "external/perfetto/protos/perfetto/config/data_source_config.pbzero.h",
+    "external/perfetto/protos/perfetto/config/stress_test_config.pbzero.h",
     "external/perfetto/protos/perfetto/config/test_config.pbzero.h",
     "external/perfetto/protos/perfetto/config/trace_config.pbzero.h",
   ],
diff --git a/BUILD b/BUILD
index b3c511a..576dc0e 100644
--- a/BUILD
+++ b/BUILD
@@ -1837,6 +1837,7 @@
     srcs = [
         "protos/perfetto/config/chrome/chrome_config.proto",
         "protos/perfetto/config/data_source_config.proto",
+        "protos/perfetto/config/stress_test_config.proto",
         "protos/perfetto/config/test_config.proto",
         "protos/perfetto/config/trace_config.proto",
     ],
diff --git a/BUILD.gn b/BUILD.gn
index 0a1b23c..3db5eba 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -81,6 +81,7 @@
   all_targets += [
     ":perfetto_integrationtests",
     "test:client_api_example",
+    "test/stress_test",
   ]
 }
 
diff --git a/include/perfetto/ext/base/subprocess.h b/include/perfetto/ext/base/subprocess.h
index d97ba5b..a57ada9 100644
--- a/include/perfetto/ext/base/subprocess.h
+++ b/include/perfetto/ext/base/subprocess.h
@@ -116,7 +116,8 @@
   enum OutputMode {
     kInherit = 0,  // Inherit's the caller process stdout/stderr.
     kDevNull,      // dup() onto /dev/null
-    kBuffer        // dup() onto a pipe and move it into the output() buffer.
+    kBuffer,       // dup() onto a pipe and move it into the output() buffer.
+    kFd,           // dup() onto the passed args.fd.
   };
 
   // Input arguments for configuring the subprocess behavior.
@@ -153,10 +154,24 @@
     OutputMode stdout_mode = kInherit;
     OutputMode stderr_mode = kInherit;
 
+    base::ScopedFile out_fd;
+
     // Returns " ".join(exec_cmd), quoting arguments.
     std::string GetCmdString() const;
   };
 
+  struct ResourceUsage {
+    uint32_t cpu_utime_ms = 0;
+    uint32_t cpu_stime_ms = 0;
+    uint32_t max_rss_kb = 0;
+    uint32_t min_page_faults = 0;
+    uint32_t maj_page_faults = 0;
+    uint32_t vol_ctx_switch = 0;
+    uint32_t invol_ctx_switch = 0;
+
+    uint32_t cpu_time_ms() const { return cpu_utime_ms + cpu_stime_ms; }
+  };
+
   explicit Subprocess(std::initializer_list<std::string> exec_cmd = {});
   Subprocess(Subprocess&&) noexcept;
   Subprocess& operator=(Subprocess&&);
@@ -183,8 +198,8 @@
 
   Status Poll();
 
-  // Sends a SIGKILL and wait to see the process termination.
-  void KillAndWaitForTermination();
+  // Sends a signal (SIGKILL if not specified) and wait for process termination.
+  void KillAndWaitForTermination(int sig_num = 0);
 
   PlatformProcessId pid() const { return s_.pid; }
 
@@ -198,6 +213,7 @@
   // This contains both stdout and stderr (if the corresponding _mode ==
   // kBuffer). It's non-const so the caller can std::move() it.
   std::string& output() { return s_.output; }
+  const ResourceUsage& rusage() const { return *s_.rusage; }
 
   Args args;
 
@@ -222,6 +238,7 @@
     int returncode = -1;
     std::string output;  // Stdin+stderr. Only when kBuffer.
     std::thread waitpid_thread;
+    std::unique_ptr<ResourceUsage> rusage;
   };
 
   MovableState s_;
diff --git a/include/perfetto/ext/base/temp_file.h b/include/perfetto/ext/base/temp_file.h
index da7e1bf..3598868 100644
--- a/include/perfetto/ext/base/temp_file.h
+++ b/include/perfetto/ext/base/temp_file.h
@@ -24,6 +24,8 @@
 namespace perfetto {
 namespace base {
 
+std::string GetSysTempDir();
+
 class TempFile {
  public:
   static TempFile CreateUnlinked();
diff --git a/protos/perfetto/config/BUILD.gn b/protos/perfetto/config/BUILD.gn
index 8a4a3b8..b17b961 100644
--- a/protos/perfetto/config/BUILD.gn
+++ b/protos/perfetto/config/BUILD.gn
@@ -37,6 +37,7 @@
   sources = [
     "chrome/chrome_config.proto",
     "data_source_config.proto",
+    "stress_test_config.proto",
     "test_config.proto",
     "trace_config.proto",
   ]
diff --git a/protos/perfetto/config/stress_test_config.proto b/protos/perfetto/config/stress_test_config.proto
new file mode 100644
index 0000000..6ed8cbc
--- /dev/null
+++ b/protos/perfetto/config/stress_test_config.proto
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+import "protos/perfetto/config/trace_config.proto";
+
+package perfetto.protos;
+
+// This is the schema for the config files in /test/stress_test/configs/*.cfg.
+message StressTestConfig {
+  optional TraceConfig trace_config = 1;
+
+  // Shared Memory Buffer setup, passed as arguments to Tracing.Initialize().
+  optional uint32 shmem_size_kb = 2;
+  optional uint32 shmem_page_size_kb = 3;
+
+  // How many producer processes to spawn.
+  optional uint32 num_processes = 4;
+
+  // How many writer threads each producer process should spawn.
+  optional uint32 num_threads = 5;
+
+  // The producer will write events until one of the following is met:
+  // - trace_config.duration_ms is reached.
+  // - max_events is reached.
+  optional uint32 max_events = 6;
+
+  // If > 0 will write nested messages up to N levels deep. The size of each
+  // nested message depends on the payload_mean / sttdev arguments (below).
+  // This is to cover the patching logic.
+  optional uint32 nesting = 7;
+
+  // This submessage defines the timings of each writer worker thread.
+  message WriterTiming {
+    // The size of the payload written on each iteration.
+    optional double payload_mean = 1;
+    optional double payload_stddev = 2;
+
+    // The nominal event writing rate, expressed in events/sec.
+    // E.g. if payload_mean = 500 (bytes) and rate_mean = 1000 (Hz), each thread
+    // will write 500 KB / sec approximately (% stalling).
+    optional double rate_mean = 3;
+    optional double rate_stddev = 4;
+
+    // If non-zero each worker will slow down the writing of the payload:
+    // it writes half payload, sleep for payload_write_time_ms, then write the
+    // other half.
+    optional uint32 payload_write_time_ms = 5;
+  }
+
+  // The timings used by default.
+  optional WriterTiming steady_state_timings = 8;
+
+  // Optionally it is possible to cause a writer to enter "burst mode",
+  // simulating peaks of high-intensity writing. The way it works is the
+  // following: by default the writer writes events using the
+  // |steady_state_timings|. Then every |burst_period_ms| it will switch to the
+  // |burst_timings| for |burst_duration_ms|, and go back to the steady state
+  // after that (and then repeat).
+  optional uint32 burst_period_ms = 9;
+  optional uint32 burst_duration_ms = 10;
+  optional WriterTiming burst_timings = 11;
+}
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index af6af01..0be07f5 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -7044,6 +7044,21 @@
 
   // The current value of the random number sequence used in tests.
   optional uint32 seq_value = 2;
+
+  // Monotonically increased on each packet.
+  optional uint64 counter = 3;
+
+  // No more packets should follow (from the current sequence).
+  optional bool is_last = 4;
+
+  message TestPayload {
+    repeated string str = 1;
+    repeated TestPayload nested = 2;
+
+    // When 0 this is the bottom-most nested message.
+    optional uint32 remaining_nesting_depth = 3;
+  }
+  optional TestPayload payload = 5;
 }
 
 // End of protos/perfetto/trace/test_event.proto
diff --git a/protos/perfetto/trace/test_event.proto b/protos/perfetto/trace/test_event.proto
index 7a6902b..c55b1a5 100644
--- a/protos/perfetto/trace/test_event.proto
+++ b/protos/perfetto/trace/test_event.proto
@@ -25,4 +25,19 @@
 
   // The current value of the random number sequence used in tests.
   optional uint32 seq_value = 2;
+
+  // Monotonically increased on each packet.
+  optional uint64 counter = 3;
+
+  // No more packets should follow (from the current sequence).
+  optional bool is_last = 4;
+
+  message TestPayload {
+    repeated string str = 1;
+    repeated TestPayload nested = 2;
+
+    // When 0 this is the bottom-most nested message.
+    optional uint32 remaining_nesting_depth = 3;
+  }
+  optional TestPayload payload = 5;
 }
diff --git a/src/base/subprocess.cc b/src/base/subprocess.cc
index 0e1fbbd..7c2ec01 100644
--- a/src/base/subprocess.cc
+++ b/src/base/subprocess.cc
@@ -21,6 +21,7 @@
 #include <poll.h>
 #include <signal.h>
 #include <stdio.h>
+#include <sys/resource.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <unistd.h>
@@ -105,6 +106,10 @@
       if (dup2(args->stdouterr_pipe_wr, STDOUT_FILENO) == -1)
         die("Failed to dup2(STDOUT)");
       break;
+    case Subprocess::kFd:
+      if (dup2(*args->create_args->out_fd, STDOUT_FILENO) == -1)
+        die("Failed to dup2(STDOUT)");
+      break;
   }
 
   switch (args->create_args->stderr_mode) {
@@ -119,6 +124,10 @@
       if (dup2(args->stdouterr_pipe_wr, STDERR_FILENO) == -1)
         die("Failed to dup2(STDERR)");
       break;
+    case Subprocess::kFd:
+      if (dup2(*args->create_args->out_fd, STDERR_FILENO) == -1)
+        die("Failed to dup2(STDERR)");
+      break;
   }
 
   // Close all FDs % stdin/out/err and the ones that the client explicitly
@@ -173,7 +182,9 @@
 Subprocess::Args::Args(Args&&) noexcept = default;
 Subprocess::Args& Subprocess::Args::operator=(Args&&) = default;
 
-Subprocess::Subprocess(std::initializer_list<std::string> a) : args(a) {}
+Subprocess::Subprocess(std::initializer_list<std::string> a) : args(a) {
+  s_.rusage.reset(new ResourceUsage());
+}
 
 Subprocess::Subprocess(Subprocess&& other) noexcept {
   static_assert(sizeof(Subprocess) == sizeof(std::tuple<MovableState, Args>),
@@ -257,10 +268,24 @@
   // Both ends of the pipe are closed after the thread.join().
   int pid = s_.pid;
   int exit_status_pipe_wr = s_.exit_status_pipe.wr.release();
-  s_.waitpid_thread = std::thread([pid, exit_status_pipe_wr] {
+  auto* rusage = s_.rusage.get();
+  s_.waitpid_thread = std::thread([pid, exit_status_pipe_wr, rusage] {
     int pid_stat = -1;
-    int wait_res = PERFETTO_EINTR(waitpid(pid, &pid_stat, 0));
+    struct rusage usg {};
+    int wait_res = PERFETTO_EINTR(wait4(pid, &pid_stat, 0, &usg));
     PERFETTO_CHECK(wait_res == pid);
+
+    auto tv_to_ms = [](const struct timeval& tv) {
+      return static_cast<uint32_t>(tv.tv_sec * 1000 + tv.tv_usec / 1000);
+    };
+    rusage->cpu_utime_ms = tv_to_ms(usg.ru_utime);
+    rusage->cpu_stime_ms = tv_to_ms(usg.ru_stime);
+    rusage->max_rss_kb = static_cast<uint32_t>(usg.ru_maxrss) / 1000;
+    rusage->min_page_faults = static_cast<uint32_t>(usg.ru_minflt);
+    rusage->maj_page_faults = static_cast<uint32_t>(usg.ru_majflt);
+    rusage->vol_ctx_switch = static_cast<uint32_t>(usg.ru_nvcsw);
+    rusage->invol_ctx_switch = static_cast<uint32_t>(usg.ru_nivcsw);
+
     base::ignore_result(PERFETTO_EINTR(
         write(exit_status_pipe_wr, &pid_stat, sizeof(pid_stat))));
     PERFETTO_CHECK(close(exit_status_pipe_wr) == 0 || errno == EINTR);
@@ -430,8 +455,8 @@
   }
 }
 
-void Subprocess::KillAndWaitForTermination() {
-  kill(s_.pid, SIGKILL);
+void Subprocess::KillAndWaitForTermination(int sig_num) {
+  kill(s_.pid, sig_num ? sig_num : SIGKILL);
   Wait();
 }
 
diff --git a/src/base/temp_file.cc b/src/base/temp_file.cc
index 59a5089..9df8a3a 100644
--- a/src/base/temp_file.cc
+++ b/src/base/temp_file.cc
@@ -22,27 +22,26 @@
 #include <stdlib.h>
 #include <unistd.h>
 
+#include "perfetto/ext/base/string_utils.h"
+
 namespace perfetto {
 namespace base {
 
-namespace {
+std::string GetSysTempDir() {
+  const char* tmpdir = getenv("TMPDIR");
+  if (tmpdir)
+    return base::StripSuffix(tmpdir, "/");
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-constexpr char kSysTmpPath[] = "/data/local/tmp";
+  return "/data/local/tmp";
 #else
-constexpr char kSysTmpPath[] = "/tmp";
+  return "/tmp";
 #endif
-}  // namespace
+}
 
 // static
 TempFile TempFile::Create() {
   TempFile temp_file;
-  const char* tmpdir = getenv("TMPDIR");
-  if (tmpdir) {
-    temp_file.path_.assign(tmpdir);
-  } else {
-    temp_file.path_.assign(kSysTmpPath);
-  }
-  temp_file.path_.append("/perfetto-XXXXXXXX");
+  temp_file.path_ = GetSysTempDir() + "/perfetto-XXXXXXXX";
   temp_file.fd_.reset(mkstemp(&temp_file.path_[0]));
   if (PERFETTO_UNLIKELY(!temp_file.fd_)) {
     PERFETTO_FATAL("Could not create temp file %s", temp_file.path_.c_str());
@@ -81,8 +80,7 @@
 // static
 TempDir TempDir::Create() {
   TempDir temp_dir;
-  temp_dir.path_.assign(kSysTmpPath);
-  temp_dir.path_.append("/perfetto-XXXXXXXX");
+  temp_dir.path_ = GetSysTempDir() + "/perfetto-XXXXXXXX";
   PERFETTO_CHECK(mkdtemp(&temp_dir.path_[0]));
   return temp_dir;
 }
@@ -96,5 +94,4 @@
 }  // namespace base
 }  // namespace perfetto
 
-
 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
diff --git a/test/stress_test/BUILD.gn b/test/stress_test/BUILD.gn
new file mode 100644
index 0000000..a5fa275
--- /dev/null
+++ b/test/stress_test/BUILD.gn
@@ -0,0 +1,47 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../gn/perfetto.gni")
+
+executable("stress_test") {
+  testonly = true
+  sources = [ "stress_test.cc" ]
+  data_deps = [
+    "../../src/perfetto_cmd:perfetto",
+    "../../src/traced/service:traced",
+  ]
+  deps = [
+    ":stress_producer",
+    "../..:libperfetto_client_experimental",
+    "../../gn:default_deps",
+    "../../include/perfetto/tracing",
+    "../../protos/perfetto/trace:zero",
+    "../../src/base",
+    "../../src/base:test_support",
+    "configs",
+  ]
+}
+
+executable("stress_producer") {
+  testonly = true
+  sources = [ "stress_producer.cc" ]
+  deps = [
+    "../..:libperfetto_client_experimental",
+    "../../gn:default_deps",
+    "../../include/perfetto/tracing",
+    "../../protos/perfetto/trace:zero",
+    "../../src/base",
+    "../../src/base:test_support",
+  ]
+}
diff --git a/test/stress_test/README.md b/test/stress_test/README.md
new file mode 100644
index 0000000..70ee1c2
--- /dev/null
+++ b/test/stress_test/README.md
@@ -0,0 +1,134 @@
+# Perfetto Stress Test
+
+This is a test harness that to stress test the client library (DataSource-level
+only for now).
+
+The test is based on a number of configs in /test/stress_test/configs/*.cfg
+(NOTE: they must be listed in configs/BUILD.gn).
+The config is a /protos/perfetto/config/stress_test_config.proto message, which
+embeds the configuration of the test and a whole trace config.
+
+Each configs defines a testing scenario, determining the general trace config
+and all the settings of the test (e.g., how many producer processes to spawn,
+the write timings).
+
+The test is based on exec()-ing `traced` (the tracing service), `perfetto` (the
+consumer cmdline client) and a variable number of `stress_producer` instances.
+
+`stress_producer` emits events at a configurable rate, writing predictable
+sequences of numbers / string, so that the test harness can easily detect
+corruptions, out-of-order events or gaps.
+
+After running each test, the `stress_test` binary reads back the trace and
+performs a bunch of checks:
+
+- Checks that the number of sequences is exactly equal to #processes x #threads.
+- Checks that each sequence has all the expected packets in the right sequence
+- Checks the payload and correctness of proto nesting of each trace packet.
+- Reports CPU/Memory/Context-switch numbers for the service and producer
+  processes.
+
+Each test config is isolated from the others. All processes are killed and
+re-spawned for each test.
+
+The stdout/err of each process is saved in a dedicated /tmp/ folder, as well as
+the resulting trace.
+
+## Building and running the test
+
+```bash
+# This will recursively build traced, perfetto and stress_producer.
+ninja -C out/default stress_test
+
+out/default/stress_test
+```
+
+will output:
+
+```txt
+[307.909] stress_test.cc:116      Saving test results in /tmp/perfetto-ltIBJgA0
+
+===============================================================
+Config: simple
+===============================================================
+Metric               Expected   Actual
+------               --------   ------
+#Errors              0          0
+Duration [ms]        3000       3109
+Num threads          1          1
+Num packets          1000       1001
+Trace size [KB]      168        170
+Svc RSS [MB]         4          2
+Prod RSS [MB]        ---        1
+Svc CPU [ms]         ---        10
+Prod CPU [ms]        ---        32
+Svc #ctxswitch       ---        103 / 20
+Prod #ctxswitch      ---        1022 / 1
+
+===============================================================
+Config: bursts
+===============================================================
+Metric               Expected   Actual
+------               --------   ------
+#Errors              0          0
+Duration [ms]        2000       2381
+Num threads          10         10
+Num packets          2675       20021
+Trace size [KB]      449        11063
+Svc RSS [MB]         32         17
+Prod RSS [MB]        ---        1
+Svc CPU [ms]         ---        98
+Prod CPU [ms]        ---        17
+Svc #ctxswitch       ---        704 / 1327
+Prod #ctxswitch      ---        421 / 1
+```
+
+```bash
+$ ls -Rlh /tmp/perfetto-ltIBJgA0
+total 0
+drwxr-xr-x  16 primiano  wheel   512B  5 Aug 09:16 bursts
+drwxr-xr-x   9 primiano  wheel   288B  5 Aug 09:16 simple
+drwxr-xr-x  38 primiano  wheel   1.2K  5 Aug 09:16 the_storm
+
+/tmp/perfetto-ltIBJgA0/bursts:
+total 22752
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 errors.log
+-rw-r--r--  1 primiano  wheel   180B  5 Aug 09:16 perfetto.log
+-rw-r--r--  1 primiano  wheel   441B  5 Aug 09:16 producer.0.log
+...
+-rw-r--r--  1 primiano  wheel   441B  5 Aug 09:16 producer.9.log
+-rw-------  1 primiano  wheel    11M  5 Aug 09:16 trace
+-rw-r--r--  1 primiano  wheel   407B  5 Aug 09:16 traced.log
+
+/tmp/perfetto-ltIBJgA0/simple:
+total 400
+srwxr-xr-x  1 primiano  wheel     0B  5 Aug 09:16 consumer.sock
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 errors.log
+-rw-r--r--  1 primiano  wheel   178B  5 Aug 09:16 perfetto.log
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 producer.0.log
+srwxr-xr-x  1 primiano  wheel     0B  5 Aug 09:16 producer.sock
+-rw-------  1 primiano  wheel   167K  5 Aug 09:16 trace
+-rw-r--r--  1 primiano  wheel   406B  5 Aug 09:16 traced.log
+
+/tmp/perfetto-ltIBJgA0/the_storm:
+total 524432
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 errors.log
+-rw-r--r--  1 primiano  wheel   184B  5 Aug 09:16 perfetto.log
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 producer.0.log
+...
+-rw-r--r--  1 primiano  wheel     0B  5 Aug 09:16 producer.127.log
+-rw-------  1 primiano  wheel   248M  5 Aug 09:16 trace
+-rw-r--r--  1 primiano  wheel   408B  5 Aug 09:16 traced.log
+```
+
+## TODOs
+
+The following scenarios requires more coverage:
+
+- Nested messages.
+- Force losses and check that the last_dropped flag is consistent.
+- Flushes and scraping.
+- Report data losses in the test output.
+- Multibuffer scenarios.
+- write_into_file=true.
+- Vary page size, smb size.
diff --git a/test/stress_test/configs/BUILD.gn b/test/stress_test/configs/BUILD.gn
new file mode 100644
index 0000000..6a12a09
--- /dev/null
+++ b/test/stress_test/configs/BUILD.gn
@@ -0,0 +1,50 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../gn/perfetto.gni")
+
+config("include_path") {
+  include_dirs = [ target_gen_dir ]
+}
+
+action("configs") {
+  testonly = true
+
+  sources = [
+    "backfills.cfg",
+    "bursts.cfg",
+    "heavy.cfg",
+    "simple.cfg",
+    "stalls.cfg",
+    "xxl_packets.cfg",
+  ]
+
+  protoc_target = "../../../gn:protoc($host_toolchain)"
+  protoc_out_dir = get_label_info(protoc_target, "root_out_dir")
+  protoc_rel_dir = rebase_path(protoc_out_dir, root_build_dir)
+  out_header = "$target_gen_dir/stress_test_config_blobs.h"
+  out_header_rel = rebase_path(out_header, root_build_dir)
+
+  deps = [ protoc_target ]
+  script = "../gen_configs_blob.py"
+  outputs = [ out_header ]
+  args = [
+    "--protoc=$protoc_rel_dir/protoc",
+    "--out=$out_header_rel",
+  ]
+  foreach(source, sources) {
+    args += [ rebase_path(source, root_build_dir) ]
+  }
+  public_configs = [ ":include_path" ]
+}
diff --git a/test/stress_test/configs/backfills.cfg b/test/stress_test/configs/backfills.cfg
new file mode 100644
index 0000000..24e1b17
--- /dev/null
+++ b/test/stress_test/configs/backfills.cfg
@@ -0,0 +1,24 @@
+# TODO(primiano): this fails with the errors below, investigate.
+# FAIL: TestEvent counter mismatch for sequence 2. Expected 100 got 99
+# FAIL: TestEvent counter mismatch for sequence 3. Expected 99 got 98
+# FAIL: TestEvent counter mismatch for sequence 4. Expected 106 got 105
+# FAIL: TestEvent counter mismatch for sequence 5. Expected 107 got 106
+# FAIL: TestEvent counter mismatch for sequence 6. Expected 102 got 101
+# FAIL: TestEvent counter mismatch for sequence 7. Expected 104 got 103
+# FAIL: TestEvent counter mismatch for sequence 8. Expected 111 got 110
+# FAIL: TestEvent counter mismatch for sequence 9. Expected 109 got 108
+
+num_processes: 1
+num_threads: 8
+
+steady_state_timings {
+  rate_mean: 100
+  payload_mean: 640
+  payload_write_time_ms: 100
+}
+
+trace_config {
+  duration_ms: 10000
+  buffers { size_kb: 500000 }
+  data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/bursts.cfg b/test/stress_test/configs/bursts.cfg
new file mode 100644
index 0000000..c8a0fdf
--- /dev/null
+++ b/test/stress_test/configs/bursts.cfg
@@ -0,0 +1,22 @@
+num_processes: 10
+num_threads: 1
+
+steady_state_timings {
+  rate_mean: 10
+  payload_mean: 128
+}
+
+# 250ms every 2s enter burst mode, bumping at 1000 events/s * 512 ~= 5 MB/s
+# (per thread) ~= 50 MB/s for the 10 processes.
+burst_period_ms: 2000
+burst_duration_ms: 250
+burst_timings {
+  rate_mean: 1000
+  payload_mean: 512
+}
+
+trace_config {
+  duration_ms: 2000
+  buffers { size_kb: 20000 }
+  data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/heavy.cfg b/test/stress_test/configs/heavy.cfg
new file mode 100644
index 0000000..c137d5c
--- /dev/null
+++ b/test/stress_test/configs/heavy.cfg
@@ -0,0 +1,27 @@
+num_processes: 32
+num_threads: 10
+nesting: 10
+
+steady_state_timings {
+  rate_mean: 10
+  payload_mean: 128
+}
+
+burst_period_ms: 2000
+burst_duration_ms: 250
+burst_timings {
+  rate_mean: 1000
+  payload_mean: 128
+}
+
+trace_config {
+  duration_ms: 5000
+  buffers { size_kb: 320000 }
+  data_sources { config { name: "perfetto.stress_test" } }
+
+  producers {
+    producer_name: "stress_producer"
+    shm_size_kb: 4000
+    page_size_kb: 4
+  }
+}
diff --git a/test/stress_test/configs/simple.cfg b/test/stress_test/configs/simple.cfg
new file mode 100644
index 0000000..4632574
--- /dev/null
+++ b/test/stress_test/configs/simple.cfg
@@ -0,0 +1,16 @@
+num_processes: 1
+num_threads: 1
+max_events: 1000
+nesting: 8
+
+# 500 events/s, ~128 bytes/event ~= 64 KB/s
+steady_state_timings {
+  rate_mean: 500
+  payload_mean: 128
+}
+
+trace_config {
+  duration_ms: 3000
+  buffers { size_kb: 8000 }
+  data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/stalls.cfg b/test/stress_test/configs/stalls.cfg
new file mode 100644
index 0000000..344194f
--- /dev/null
+++ b/test/stress_test/configs/stalls.cfg
@@ -0,0 +1,15 @@
+num_processes: 8
+num_threads: 4
+max_events: 10000
+
+# 1K events/s * 10K = 10 MB/s per thread
+steady_state_timings {
+  rate_mean: 1000
+  payload_mean: 10000
+}
+
+trace_config {
+  duration_ms: 5000
+  buffers { size_kb: 500000 }
+  data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/xxl_packets.cfg b/test/stress_test/configs/xxl_packets.cfg
new file mode 100644
index 0000000..fbafb2a
--- /dev/null
+++ b/test/stress_test/configs/xxl_packets.cfg
@@ -0,0 +1,22 @@
+# Four threads writing large and nested packets of 32MB each every second.
+
+num_processes: 1
+num_threads: 4
+max_events: 5
+nesting: 2
+
+# Each writer will write packets of 16 MB ((1 + nesting=1) x payload 8MB)
+steady_state_timings {
+  rate_mean: 1
+  payload_mean: 8000000
+  payload_write_time_ms: 100
+}
+
+trace_config {
+  duration_ms: 10000
+  buffers {
+    size_kb: 500000
+    fill_policy: DISCARD
+  }
+  data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/gen_configs_blob.py b/test/stress_test/gen_configs_blob.py
new file mode 100644
index 0000000..ba1f2c7
--- /dev/null
+++ b/test/stress_test/gen_configs_blob.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python3
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Compiles the stress_test configs protos and bundles in a .h C++ array.
+
+This scripts takes all the configs in /test/stress_test/configs, compiles them
+with protoc and generates a C++ header which contains the configs' names and
+proto-encoded bytes.
+
+This is invoked by the build system and is used by the stress_test runner. The
+goal is making the stress_test binary hermetic and not depend on the repo.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+import os
+import sys
+import argparse
+import shutil
+import subprocess
+
+CUR_DIR = os.path.dirname(os.path.realpath(__file__))
+ROOT_DIR = os.path.dirname(os.path.dirname(CUR_DIR))
+CONFIGS_DIR = os.path.join(CUR_DIR, 'configs')
+
+
+def find_protoc():
+  for root, _, files in os.walk(os.path.join(ROOT_DIR, 'out')):
+    if 'protoc' in files:
+      return os.path.join(root, 'protoc')
+  return None
+
+
+def main():
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--protoc')
+  parser.add_argument('--out', required=True)
+  parser.add_argument('cfgfiles', nargs='+')
+  args = parser.parse_args()
+
+  protoc = args.protoc or find_protoc()
+  assert protoc, 'protoc not found, pass --protoc /path/to/protoc'
+  assert os.path.exists(protoc), '{} does not exist'.format(protoc)
+  if protoc is not args.protoc:
+    print('Using protoc: {}'.format(protoc))
+
+  blobs = {}
+  for cfg_path in args.cfgfiles:
+    cfg_name = os.path.splitext(cfg_path)[0].split(os.sep)[-1]
+    with open(cfg_path, 'r') as in_file:
+      compiled_proto = subprocess.check_output([
+          protoc,
+          '--encode=perfetto.protos.StressTestConfig',
+          '--proto_path=' + ROOT_DIR,
+          os.path.join(ROOT_DIR, 'protos', 'perfetto', 'config',
+                       'stress_test_config.proto'),
+      ],
+                                               stdin=in_file)
+    blobs[cfg_name] = bytearray(compiled_proto)
+
+  # Write the C++ header file
+  fout = open(args.out, 'wb')
+  include_guard = args.out.replace('/', '_').replace('.', '_').upper() + '_'
+  fout.write("""
+#ifndef {include_guard}
+#define {include_guard}
+
+#include <stddef.h>
+#include <stdint.h>
+
+// This file was autogenerated by ${gen_script}. Do not edit.
+
+namespace perfetto {{
+namespace {{
+
+struct StressTestConfigBlob {{
+  const char* name;
+  const uint8_t* data;
+  size_t size;
+}};\n\n""".format(
+      gen_script=__file__,
+      include_guard=include_guard,
+  ).encode())
+
+  configs_arr = '\nconst StressTestConfigBlob kStressTestConfigs[] = {\n'
+  for cfg_name, blob in blobs.items():
+    arr_str = ','.join(str(b) for b in blob)
+    line = 'const uint8_t _config_%s[]{%s};\n' % (cfg_name, arr_str)
+    fout.write(line.encode())
+    configs_arr += '  {{"{n}", _config_{n}, sizeof(_config_{n})}},\n'.format(
+        n=cfg_name)
+  configs_arr += '};\n'
+  fout.write(configs_arr.encode())
+  fout.write("""
+}  // namespace
+}  // namespace perfetto
+#endif\n""".encode())
+  fout.close()
+
+
+if __name__ == '__main__':
+  exit(main())
diff --git a/test/stress_test/stress_producer.cc b/test/stress_test/stress_producer.cc
new file mode 100644
index 0000000..2845efc
--- /dev/null
+++ b/test/stress_test/stress_producer.cc
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <math.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <list>
+#include <random>
+#include <thread>
+
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/tracing.h"
+
+#include "protos/perfetto/config/stress_test_config.gen.h"
+#include "protos/perfetto/trace/test_event.pbzero.h"
+
+using StressTestConfig = perfetto::protos::gen::StressTestConfig;
+
+namespace perfetto {
+namespace {
+
+StressTestConfig* g_cfg;
+
+class StressTestDataSource : public DataSource<StressTestDataSource> {
+ public:
+  constexpr static BufferExhaustedPolicy kBufferExhaustedPolicy =
+      BufferExhaustedPolicy::kStall;
+
+  void OnSetup(const SetupArgs& args) override;
+  void OnStart(const StartArgs&) override;
+  void OnStop(const StopArgs&) override;
+
+ private:
+  class Worker {
+   public:
+    explicit Worker(uint32_t id) : id_(id) {}
+    void Start();
+    void Stop();
+    ~Worker() { Stop(); }
+
+   private:
+    void WorkerMain(uint32_t worker_id);
+    void FillPayload(const StressTestConfig::WriterTiming&,
+                     uint32_t seq,
+                     uint32_t nesting,
+                     protos::pbzero::TestEvent::TestPayload*);
+
+    const uint32_t id_;
+    std::thread thread_;
+    std::atomic<bool> quit_;
+    std::minstd_rand0 rnd_seq_;
+
+    // Use a different engine for the generation of random value, keep rnd_seq_
+    // dedicated to generating deterministic sequences.
+    std::minstd_rand0 rnd_gen_;
+  };
+
+  std::list<Worker> workers_;
+};
+
+// Called before the tracing session starts.
+void StressTestDataSource::OnSetup(const SetupArgs&) {
+  for (uint32_t i = 0; i < std::max(g_cfg->num_threads(), 1u); ++i)
+    workers_.emplace_back(i);
+}
+
+// Called when the tracing session starts.
+void StressTestDataSource::OnStart(const StartArgs&) {
+  for (auto& worker : workers_)
+    worker.Start();
+}
+
+// Called when the tracing session ends.
+void StressTestDataSource::OnStop(const StopArgs&) {
+  for (auto& worker : workers_)
+    worker.Stop();
+  workers_.clear();
+}
+
+void StressTestDataSource::Worker::Start() {
+  quit_.store(false);
+  thread_ = std::thread(&StressTestDataSource::Worker::WorkerMain, this, id_);
+}
+
+void StressTestDataSource::Worker::Stop() {
+  if (!thread_.joinable() || quit_)
+    return;
+  PERFETTO_DLOG("Stopping worker %u", id_);
+  quit_.store(true);
+  thread_.join();
+}
+
+void StressTestDataSource::Worker::WorkerMain(uint32_t worker_id) {
+  PERFETTO_DLOG("Worker %u starting", worker_id);
+  rnd_seq_ = std::minstd_rand0(0);
+  int64_t t_start = base::GetBootTimeNs().count();
+  int64_t num_msgs = 0;
+
+  const int64_t max_msgs = g_cfg->max_events()
+                               ? static_cast<int64_t>(g_cfg->max_events())
+                               : INT64_MAX;
+  bool is_last = false;
+  while (!is_last) {
+    is_last = quit_ || ++num_msgs >= max_msgs;
+
+    const int64_t now = base::GetBootTimeNs().count();
+    const auto elapsed_ms = static_cast<uint64_t>((now - t_start) / 1000000);
+
+    const auto* timings = &g_cfg->steady_state_timings();
+    if (g_cfg->burst_period_ms() &&
+        elapsed_ms % g_cfg->burst_period_ms() >
+            (g_cfg->burst_period_ms() - g_cfg->burst_duration_ms())) {
+      timings = &g_cfg->burst_timings();
+    }
+    std::normal_distribution<> rate_dist{timings->rate_mean(),
+                                         timings->rate_stddev()};
+
+    double period_ns = 1e9 / rate_dist(rnd_gen_);
+    period_ns = isnan(period_ns) || period_ns == 0.0 ? 1 : period_ns;
+    double expected_msgs = static_cast<double>(now - t_start) / period_ns;
+    int64_t delay_ns = 0;
+    if (static_cast<int64_t>(expected_msgs) < num_msgs)
+      delay_ns = static_cast<int64_t>(period_ns);
+    std::this_thread::sleep_for(
+        std::chrono::nanoseconds(static_cast<int64_t>(delay_ns)));
+
+    StressTestDataSource::Trace([&](StressTestDataSource::TraceContext ctx) {
+      const uint32_t seq = static_cast<uint32_t>(rnd_seq_());
+      auto packet = ctx.NewTracePacket();
+      packet->set_timestamp(static_cast<uint64_t>(now));
+      auto* test_event = packet->set_for_testing();
+      test_event->set_seq_value(seq);
+      test_event->set_counter(static_cast<uint64_t>(num_msgs));
+      if (is_last)
+        test_event->set_is_last(true);
+
+      FillPayload(*timings, seq, g_cfg->nesting(), test_event->set_payload());
+    });  // Trace().
+
+  }  // while (!quit)
+  PERFETTO_DLOG("Worker done");
+}
+
+void StressTestDataSource::Worker::FillPayload(
+    const StressTestConfig::WriterTiming& timings,
+    uint32_t seq,
+    uint32_t nesting,
+    protos::pbzero::TestEvent::TestPayload* payload) {
+  // Write the payload in two halves, optionally with some delay in the
+  // middle.
+  std::normal_distribution<> msg_size_dist{timings.payload_mean(),
+                                           timings.payload_stddev()};
+  auto payload_size =
+      static_cast<uint32_t>(std::max(std::round(msg_size_dist(rnd_gen_)), 0.0));
+  std::string buf;
+  buf.resize(payload_size / 2);
+  for (size_t i = 0; i < buf.size(); ++i) {
+    buf[i] = static_cast<char>(33 + ((seq + i) % 64));  // Stay ASCII.
+  }
+  payload->add_str(buf);
+  payload->set_remaining_nesting_depth(nesting);
+  if (timings.payload_write_time_ms() > 0) {
+    std::this_thread::sleep_for(
+        std::chrono::milliseconds(timings.payload_write_time_ms()));
+  }
+
+  if (nesting > 0)
+    FillPayload(timings, seq, nesting - 1, payload->add_nested());
+
+  payload->add_str(buf);
+}
+}  // namespace
+
+PERFETTO_DEFINE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource);
+
+}  // namespace perfetto
+
+int main() {
+  perfetto::TracingInitArgs args;
+  args.backends = perfetto::kSystemBackend;
+
+  std::string config_blob;
+  if (isatty(STDIN_FILENO))
+    PERFETTO_LOG("Reading StressTestConfig proto from stdin");
+  perfetto::base::ReadFileStream(stdin, &config_blob);
+
+  StressTestConfig cfg;
+  perfetto::g_cfg = &cfg;
+  if (config_blob.empty() || !cfg.ParseFromString(config_blob))
+    PERFETTO_FATAL("A StressTestConfig blob must be passed into stdin");
+
+  if (cfg.shmem_page_size_kb())
+    args.shmem_page_size_hint_kb = cfg.shmem_page_size_kb();
+  if (cfg.shmem_size_kb())
+    args.shmem_page_size_hint_kb = cfg.shmem_size_kb();
+
+  perfetto::Tracing::Initialize(args);
+  perfetto::DataSourceDescriptor dsd;
+  dsd.set_name("perfetto.stress_test");
+  perfetto::StressTestDataSource::Register(dsd);
+
+  for (;;) {
+    pause();
+  }
+}
diff --git a/test/stress_test/stress_test.cc b/test/stress_test/stress_test.cc
new file mode 100644
index 0000000..d66039e
--- /dev/null
+++ b/test/stress_test/stress_test.cc
@@ -0,0 +1,497 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#include <stdarg.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+#include <chrono>
+#include <list>
+#include <map>
+#include <random>
+#include <regex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/subprocess.h"
+#include "perfetto/ext/base/temp_file.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/tracing.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "src/base/test/utils.h"
+
+#include "protos/perfetto/config/stress_test_config.gen.h"
+#include "protos/perfetto/trace/test_event.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+// Generated by gen_configs_blob.py. It defines the kStressTestConfigs array,
+// which contains a proto-encoded StressTestConfig message for each .cfg file
+// listed in /test/stress_test/configs/BUILD.gn.
+#include "test/stress_test/configs/stress_test_config_blobs.h"
+
+namespace perfetto {
+namespace {
+
+using StressTestConfig = protos::gen::StressTestConfig;
+
+struct SigHandlerCtx {
+  std::atomic<bool> aborted{};
+  std::vector<int> pids_to_kill;
+};
+SigHandlerCtx* g_sig;
+
+struct TestResult {
+  const char* cfg_name = nullptr;
+  StressTestConfig cfg;
+  uint32_t run_time_ms = 0;
+  uint32_t trace_size_kb = 0;
+  uint32_t num_packets = 0;
+  uint32_t num_threads = 0;
+  uint32_t num_errors = 0;
+  base::Subprocess::ResourceUsage svc_rusage;
+  base::Subprocess::ResourceUsage prod_rusage;
+};
+
+struct ParsedTraceStats {
+  struct WriterThread {
+    uint64_t packets_seen = 0;
+    bool last_seen = false;
+    uint32_t last_seq = 0;
+    uint64_t seq_errors = 0;
+    uint64_t counter_errors = 0;
+    std::minstd_rand0 rnd_engine;
+  };
+
+  // One for each trusted_packet_sequence_id.
+  std::map<uint32_t, WriterThread> threads;
+};
+
+class TestHarness {
+ public:
+  TestHarness();
+  void RunConfig(const char* cfg_name, const StressTestConfig&, bool verbose);
+  const std::list<TestResult>& test_results() const { return test_results_; }
+
+ private:
+  void ReadbackTrace(const std::string&, ParsedTraceStats*);
+  void ParseTracePacket(const uint8_t*, size_t, ParsedTraceStats* ctx);
+  void AddFailure(const char* fmt, ...) PERFETTO_PRINTF_FORMAT(2, 3);
+
+  std::vector<std::string> env_;
+  std::list<TestResult> test_results_;
+  std::string results_dir_;
+  base::ScopedFile error_log_;
+};
+
+TestHarness::TestHarness() {
+  results_dir_ = base::GetSysTempDir() + "/perfetto-stress-test";
+  system(("rm -r -- \"" + results_dir_ + "\"").c_str());
+  PERFETTO_CHECK(mkdir(results_dir_.c_str(), 0755) == 0);
+  PERFETTO_LOG("Saving test results in %s", results_dir_.c_str());
+}
+
+void TestHarness::AddFailure(const char* fmt, ...) {
+  ++test_results_.back().num_errors;
+
+  char log_msg[512];
+  va_list args;
+  va_start(args, fmt);
+  int res = vsnprintf(log_msg, sizeof(log_msg), fmt, args);
+  va_end(args);
+
+  PERFETTO_ELOG("FAIL: %s", log_msg);
+
+  if (res > 0 && static_cast<size_t>(res) < sizeof(log_msg) - 2) {
+    log_msg[res++] = '\n';
+    log_msg[res++] = '\0';
+  }
+  base::ignore_result(write(*error_log_, log_msg, static_cast<size_t>(res)));
+}
+
+void TestHarness::RunConfig(const char* cfg_name,
+                            const StressTestConfig& cfg,
+                            bool verbose) {
+  test_results_.emplace_back();
+  TestResult& test_result = test_results_.back();
+  test_result.cfg_name = cfg_name;
+  test_result.cfg = cfg;
+  g_sig->pids_to_kill.clear();
+
+  auto result_dir = results_dir_ + "/" + cfg_name;
+  PERFETTO_CHECK(!mkdir(result_dir.c_str(), 0755));
+  error_log_ = base::OpenFile(result_dir + "/errors.log",
+                              O_RDWR | O_CREAT | O_TRUNC, 0644);
+
+  PERFETTO_ILOG("Starting \"%s\" - %s", cfg_name, result_dir.c_str());
+
+  env_.emplace_back("PERFETTO_PRODUCER_SOCK_NAME=" + result_dir +
+                    "/producer.sock");
+  env_.emplace_back("PERFETTO_CONSUMER_SOCK_NAME=" + result_dir +
+                    "/consumer.sock");
+  std::string bin_dir = base::GetCurExecutableDir();
+
+  // Start the service.
+  base::Subprocess traced({bin_dir + "/traced"});
+  traced.args.env = env_;
+  if (!verbose) {
+    traced.args.out_fd = base::OpenFile(result_dir + "/traced.log",
+                                        O_RDWR | O_CREAT | O_TRUNC, 0644);
+    traced.args.stderr_mode = traced.args.stdout_mode = base::Subprocess::kFd;
+  }
+  traced.Start();
+  g_sig->pids_to_kill.emplace_back(traced.pid());
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  PERFETTO_CHECK(traced.Poll() == base::Subprocess::kRunning);
+
+  // Start the producer processes.
+  std::list<base::Subprocess> producers;
+  for (uint32_t i = 0; i < cfg.num_processes(); ++i) {
+    producers.emplace_back(base::Subprocess({bin_dir + "/stress_producer"}));
+    auto& producer = producers.back();
+    producer.args.input = cfg.SerializeAsString();
+    if (!verbose) {
+      producer.args.out_fd =
+          base::OpenFile(result_dir + "/producer." + std::to_string(i) + ".log",
+                         O_RDWR | O_CREAT | O_TRUNC, 0644);
+      producer.args.stderr_mode = producer.args.stdout_mode =
+          base::Subprocess::kFd;
+    }
+    producer.args.env = env_;
+    producer.Start();
+    g_sig->pids_to_kill.emplace_back(producer.pid());
+  }
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  for (auto& producer : producers)
+    PERFETTO_CHECK(producer.Poll() == base::Subprocess::kRunning);
+
+  auto trace_file_path = result_dir + "/trace";
+  base::Subprocess consumer(
+      {bin_dir + "/perfetto", "-c", "-", "-o", trace_file_path.c_str()});
+  consumer.args.env = env_;
+  consumer.args.input = cfg.trace_config().SerializeAsString();
+  if (!verbose) {
+    consumer.args.out_fd = base::OpenFile(result_dir + "/perfetto.log",
+                                          O_RDWR | O_CREAT | O_TRUNC, 0644);
+    consumer.args.stderr_mode = consumer.args.stdout_mode =
+        base::Subprocess::kFd;
+  }
+  unlink(trace_file_path.c_str());
+  consumer.Start();
+  int64_t t_start = base::GetBootTimeNs().count();
+  g_sig->pids_to_kill.emplace_back(consumer.pid());
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  PERFETTO_CHECK(consumer.Poll() == base::Subprocess::kRunning);
+
+  if (!consumer.Wait(
+          static_cast<int>(cfg.trace_config().duration_ms() + 30000))) {
+    AddFailure("Consumer didn't quit in time");
+    consumer.KillAndWaitForTermination();
+  }
+
+  // Stop
+  consumer.KillAndWaitForTermination(SIGTERM);
+  int64_t t_end = base::GetBootTimeNs().count();
+
+  for (auto& producer : producers) {
+    producer.KillAndWaitForTermination();
+    test_result.prod_rusage = producer.rusage();  // Only keep last one
+  }
+  producers.clear();
+  traced.KillAndWaitForTermination();
+
+  test_result.svc_rusage = traced.rusage();
+  test_result.run_time_ms = static_cast<uint32_t>((t_end - t_start) / 1000000);
+
+  // Verify
+  // TODO(primiano): read back the TraceStats and check them as well.
+  ParsedTraceStats ctx;
+  ReadbackTrace(trace_file_path, &ctx);
+  auto exp_thd = cfg.num_processes() * cfg.num_threads();
+  if (ctx.threads.size() != exp_thd) {
+    AddFailure("Trace threads mismatch. Expected %u threads, got %zu", exp_thd,
+               ctx.threads.size());
+  }
+  for (const auto& it : ctx.threads) {
+    uint32_t seq_id = it.first;
+    const auto& thd = it.second;
+    if (!thd.last_seen) {
+      AddFailure("Last packet not seen for sequence %u", seq_id);
+    }
+    if (thd.seq_errors > 0) {
+      AddFailure("Sequence %u had %" PRIu64 " packets out of sync", seq_id,
+                 thd.seq_errors);
+    }
+    if (thd.counter_errors > 0) {
+      AddFailure("Sequence %u had %" PRIu64 " packets counter errors", seq_id,
+                 thd.counter_errors);
+    }
+  }
+
+  error_log_.reset();
+  PERFETTO_ILOG("Completed \"%s\"", cfg_name);
+}
+
+void TestHarness::ReadbackTrace(const std::string& trace_file_path,
+                                ParsedTraceStats* ctx) {
+  TestResult& test_result = test_results_.back();
+  using namespace protozero::proto_utils;
+  auto fd = base::OpenFile(trace_file_path.c_str(), O_RDONLY);
+  if (!fd)
+    return AddFailure("Trace file does not exist");
+  const off_t file_size = lseek(*fd, 0, SEEK_END);
+  if (file_size <= 0)
+    return AddFailure("Trace file is empty");
+  test_result.trace_size_kb = static_cast<uint32_t>(file_size / 1000);
+  const uint8_t* const start = static_cast<const uint8_t*>(mmap(
+      nullptr, static_cast<size_t>(file_size), PROT_READ, MAP_PRIVATE, *fd, 0));
+  PERFETTO_CHECK(start != MAP_FAILED);
+  const uint8_t* const end = start + file_size;
+
+  constexpr uint8_t kTracePacketTag = MakeTagLengthDelimited(1);
+
+  for (auto* ptr = start; (end - ptr) > 2;) {
+    const uint8_t* tokenizer_start = ptr;
+    if (*(ptr++) != kTracePacketTag) {
+      return AddFailure("Tokenizer failure at offset %zd", ptr - start);
+    }
+    uint64_t packet_size = 0;
+    ptr = ParseVarInt(ptr, end, &packet_size);
+    const uint8_t* const packet_start = ptr;
+    ptr += packet_size;
+    if ((ptr - tokenizer_start) < 2 || ptr > end)
+      return AddFailure("Got invalid packet size %" PRIu64 " at offset %zd",
+                        packet_size,
+                        static_cast<ssize_t>(packet_start - start));
+    ParseTracePacket(packet_start, static_cast<size_t>(packet_size), ctx);
+  }
+  test_result.num_threads = static_cast<uint32_t>(ctx->threads.size());
+}
+
+void TestHarness::ParseTracePacket(const uint8_t* start,
+                                   size_t size,
+                                   ParsedTraceStats* ctx) {
+  TestResult& test_result = test_results_.back();
+  protos::pbzero::TracePacket::Decoder packet(start, size);
+  if (!packet.has_for_testing())
+    return;
+
+  ++test_result.num_packets;
+  const uint32_t seq_id = packet.trusted_packet_sequence_id();
+
+  protos::pbzero::TestEvent::Decoder te(packet.for_testing());
+  auto t_it = ctx->threads.find(seq_id);
+  bool is_first_packet = false;
+  if (t_it == ctx->threads.end()) {
+    is_first_packet = true;
+    t_it = ctx->threads.emplace(seq_id, ParsedTraceStats::WriterThread()).first;
+  }
+  ParsedTraceStats::WriterThread& thd = t_it->second;
+
+  ++thd.packets_seen;
+  if (te.is_last()) {
+    if (thd.last_seen) {
+      return AddFailure(
+          "last_seen=true happened more than once for sequence %u", seq_id);
+    } else {
+      thd.last_seen = true;
+    }
+  }
+  if (is_first_packet) {
+    thd.rnd_engine = std::minstd_rand0(te.seq_value());
+  } else {
+    const uint32_t expected = static_cast<uint32_t>(thd.rnd_engine());
+    if (te.seq_value() != expected) {
+      thd.rnd_engine = std::minstd_rand0(te.seq_value());  // Resync the engine.
+      ++thd.seq_errors;
+      return AddFailure(
+          "TestEvent seq mismatch for sequence %u. Expected %u got %u", seq_id,
+          expected, te.seq_value());
+    }
+    if (te.counter() != thd.packets_seen) {
+      return AddFailure(
+          "TestEvent counter mismatch for sequence %u. Expected %" PRIu64
+          " got %" PRIu64,
+          seq_id, thd.packets_seen, te.counter());
+    }
+  }
+
+  if (!te.has_payload()) {
+    return AddFailure("TestEvent %u for sequence %u has no payload",
+                      te.seq_value(), seq_id);
+  }
+
+  // Check the validity of the payload. The payload might be nested. If that is
+  // the case, we need to check all levels.
+  protozero::ConstBytes payload_bounds = te.payload();
+  for (uint32_t depth = 0, last_depth = 0;; depth++) {
+    if (depth > 100) {
+      return AddFailure("Unexpectedly deep depth for event %u, sequence %u",
+                        te.seq_value(), seq_id);
+    }
+    protos::pbzero::TestEvent::TestPayload::Decoder payload(payload_bounds);
+    const uint32_t rem_depth = payload.remaining_nesting_depth();
+
+    // The payload is a repeated field and must have exactly two instances.
+    // The writer splits it always in two halves of identical size.
+    int num_payload_pieces = 0;
+    size_t last_size = 0;
+    for (auto it = payload.str(); it; ++it, ++num_payload_pieces) {
+      protozero::ConstChars payload_str = *it;
+      last_size = last_size ? last_size : payload_str.size;
+      if (payload_str.size != last_size) {
+        return AddFailure(
+            "Asymmetrical payload at depth %u, event id %u, sequence %u. "
+            "%zu != %zu",
+            depth, te.seq_value(), seq_id, last_size, payload_str.size);
+      }
+      // Check that the payload content matches the expected sequence.
+      for (size_t i = 0; i < payload_str.size; i++) {
+        char exp = static_cast<char>(33 + ((te.seq_value() + i) % 64));
+        if (payload_str.data[i] != exp) {
+          return AddFailure(
+              "Payload mismatch at %zu, depth %u, event id %u, sequence %u. "
+              "Expected: 0x%x, Actual: 0x%x",
+              i, depth, te.seq_value(), seq_id, exp, payload_str.data[i]);
+        }
+      }
+    }
+    if (num_payload_pieces != 2) {
+      return AddFailure(
+          "Broken payload at depth %u, event id %u, sequence %u. "
+          "Expecting 2 repeated str fields, got %d",
+          depth, te.seq_value(), seq_id, num_payload_pieces);
+    }
+
+    if (depth > 0 && rem_depth != last_depth - 1) {
+      return AddFailure(
+          "Unexpected nesting level (expected: %u, actual: %u) at depth %u, "
+          "event id %u, sequence %u",
+          rem_depth, last_depth - 1, depth, te.seq_value(), seq_id);
+    }
+
+    last_depth = rem_depth;
+    if (rem_depth == 0)
+      break;
+    if (payload.has_nested()) {
+      payload_bounds = *payload.nested();
+    } else {
+      payload_bounds = {nullptr, 0};
+    }
+  }
+}
+
+void CtrlCHandler(int) {
+  g_sig->aborted.store(true);
+  for (auto it = g_sig->pids_to_kill.rbegin(); it != g_sig->pids_to_kill.rend();
+       it++) {
+    kill(*it, SIGKILL);
+  }
+}
+
+void StressTestMain(int argc, char** argv) {
+  TestHarness th;
+  std::regex filter;
+  bool has_filter = false;
+
+  bool verbose = false;
+  for (int i = 1; i < argc; ++i) {
+    if (!strcmp(argv[i], "-v")) {
+      verbose = true;
+    } else {
+      filter = std::regex(argv[i], std::regex::ECMAScript | std::regex::icase);
+      has_filter = true;
+    }
+  }
+
+  g_sig = new SigHandlerCtx();
+  signal(SIGINT, CtrlCHandler);
+
+  for (size_t i = 0; i < base::ArraySize(kStressTestConfigs) && !g_sig->aborted;
+       ++i) {
+    const auto& cfg_blob = kStressTestConfigs[i];
+    StressTestConfig cfg;
+    std::cmatch ignored;
+    if (has_filter && !std::regex_search(cfg_blob.name, ignored, filter)) {
+      continue;
+    }
+    PERFETTO_CHECK(cfg.ParseFromArray(cfg_blob.data, cfg_blob.size));
+    th.RunConfig(cfg_blob.name, cfg, verbose);
+  }
+
+  for (const auto& tres : th.test_results()) {
+    const auto& cfg = tres.cfg;
+    printf("===============================================================\n");
+    printf("Config: %s\n", tres.cfg_name);
+    printf("===============================================================\n");
+    printf("%-20s %-10s %-10s\n", "Metric", "Expected", "Actual");
+    printf("%-20s %-10s %-10s\n", "------", "--------", "------");
+    printf("%-20s %-10d %-10d\n", "#Errors", 0, tres.num_errors);
+    printf("%-20s %-10d %-10d \n", "Duration [ms]",
+           cfg.trace_config().duration_ms(), tres.run_time_ms);
+
+    uint32_t exp_threads = cfg.num_processes() * cfg.num_threads();
+    printf("%-20s %-10u %-10u\n", "Num threads", exp_threads, tres.num_threads);
+
+    double dur_s = cfg.trace_config().duration_ms() / 1e3;
+    double exp_per_thread = cfg.steady_state_timings().rate_mean() * dur_s;
+    if (cfg.burst_period_ms()) {
+      double burst_rate = 1.0 * cfg.burst_duration_ms() / cfg.burst_period_ms();
+      exp_per_thread *= 1.0 - burst_rate;
+      exp_per_thread += burst_rate * cfg.burst_timings().rate_mean() * dur_s;
+    }
+    if (cfg.max_events())
+      exp_per_thread = std::min(exp_per_thread, 1.0 * cfg.max_events());
+    double exp_packets = std::round(exp_per_thread * exp_threads);
+    printf("%-20s %-10.0f %-10d\n", "Num packets", exp_packets,
+           tres.num_packets);
+
+    double exp_size_kb = exp_packets * (cfg.nesting() + 1) *
+                         (cfg.steady_state_timings().payload_mean() + 40) /
+                         1000;
+    printf("%-20s ~%-9.0f %-10d\n", "Trace size [KB]", exp_size_kb,
+           tres.trace_size_kb);
+
+    double exp_rss_mb = cfg.trace_config().buffers()[0].size_kb() / 1000;
+    printf("%-20s (max) %-4.0f %-10d\n", "Svc RSS [MB]", exp_rss_mb,
+           tres.svc_rusage.max_rss_kb / 1000);
+    printf("%-20s %-10s %-10d\n", "Svc CPU [ms]", "---",
+           tres.svc_rusage.cpu_time_ms());
+    printf("%-20s %-10s %d / %d\n", "Svc #ctxswitch", "---",
+           tres.svc_rusage.invol_ctx_switch, tres.svc_rusage.vol_ctx_switch);
+
+    printf("%-20s %-10s %-10d\n", "Prod RSS [MB]", "---",
+           tres.prod_rusage.max_rss_kb / 1000);
+    printf("%-20s %-10s %-10d\n", "Prod CPU [ms]", "---",
+           tres.prod_rusage.cpu_time_ms());
+    printf("%-20s %-10s %d / %d\n", "Prod #ctxswitch", "---",
+           tres.prod_rusage.invol_ctx_switch, tres.prod_rusage.vol_ctx_switch);
+    printf("\n");
+  }
+}
+
+}  // namespace
+}  // namespace perfetto
+
+int main(int argc, char** argv) {
+  perfetto::StressTestMain(argc, argv);
+}