Tests: Add client library + service stress test
Add initial version of the client library & service stress test.
This tests tries to push the producer & service to its limits
by spawning a configurable number of processes and threads,
each writing events at a customizable rate.
Some work is still left, I still want to tune the configs and
reason on what to expect. Meanwhile checking in the harness.
Change-Id: Iea0b01e52018125e44f42c5b61c687c60315893b
diff --git a/Android.bp b/Android.bp
index 1da2804..37c42cf 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2155,6 +2155,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -2166,6 +2167,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.gen.cc",
"external/perfetto/protos/perfetto/config/data_source_config.gen.cc",
+ "external/perfetto/protos/perfetto/config/stress_test_config.gen.cc",
"external/perfetto/protos/perfetto/config/test_config.gen.cc",
"external/perfetto/protos/perfetto/config/trace_config.gen.cc",
],
@@ -2177,6 +2179,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -2188,6 +2191,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.gen.h",
"external/perfetto/protos/perfetto/config/data_source_config.gen.h",
+ "external/perfetto/protos/perfetto/config/stress_test_config.gen.h",
"external/perfetto/protos/perfetto/config/test_config.gen.h",
"external/perfetto/protos/perfetto/config/trace_config.gen.h",
],
@@ -2533,6 +2537,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -2543,6 +2548,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.pb.cc",
"external/perfetto/protos/perfetto/config/data_source_config.pb.cc",
+ "external/perfetto/protos/perfetto/config/stress_test_config.pb.cc",
"external/perfetto/protos/perfetto/config/test_config.pb.cc",
"external/perfetto/protos/perfetto/config/trace_config.pb.cc",
],
@@ -2554,6 +2560,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -2564,6 +2571,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.pb.h",
"external/perfetto/protos/perfetto/config/data_source_config.pb.h",
+ "external/perfetto/protos/perfetto/config/stress_test_config.pb.h",
"external/perfetto/protos/perfetto/config/test_config.pb.h",
"external/perfetto/protos/perfetto/config/trace_config.pb.h",
],
@@ -3133,6 +3141,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -3144,6 +3153,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.pbzero.cc",
"external/perfetto/protos/perfetto/config/data_source_config.pbzero.cc",
+ "external/perfetto/protos/perfetto/config/stress_test_config.pbzero.cc",
"external/perfetto/protos/perfetto/config/test_config.pbzero.cc",
"external/perfetto/protos/perfetto/config/trace_config.pbzero.cc",
],
@@ -3155,6 +3165,7 @@
srcs: [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
@@ -3166,6 +3177,7 @@
out: [
"external/perfetto/protos/perfetto/config/chrome/chrome_config.pbzero.h",
"external/perfetto/protos/perfetto/config/data_source_config.pbzero.h",
+ "external/perfetto/protos/perfetto/config/stress_test_config.pbzero.h",
"external/perfetto/protos/perfetto/config/test_config.pbzero.h",
"external/perfetto/protos/perfetto/config/trace_config.pbzero.h",
],
diff --git a/BUILD b/BUILD
index b3c511a..576dc0e 100644
--- a/BUILD
+++ b/BUILD
@@ -1837,6 +1837,7 @@
srcs = [
"protos/perfetto/config/chrome/chrome_config.proto",
"protos/perfetto/config/data_source_config.proto",
+ "protos/perfetto/config/stress_test_config.proto",
"protos/perfetto/config/test_config.proto",
"protos/perfetto/config/trace_config.proto",
],
diff --git a/BUILD.gn b/BUILD.gn
index 0a1b23c..3db5eba 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -81,6 +81,7 @@
all_targets += [
":perfetto_integrationtests",
"test:client_api_example",
+ "test/stress_test",
]
}
diff --git a/include/perfetto/ext/base/subprocess.h b/include/perfetto/ext/base/subprocess.h
index d97ba5b..a57ada9 100644
--- a/include/perfetto/ext/base/subprocess.h
+++ b/include/perfetto/ext/base/subprocess.h
@@ -116,7 +116,8 @@
enum OutputMode {
kInherit = 0, // Inherit's the caller process stdout/stderr.
kDevNull, // dup() onto /dev/null
- kBuffer // dup() onto a pipe and move it into the output() buffer.
+ kBuffer, // dup() onto a pipe and move it into the output() buffer.
+ kFd, // dup() onto the passed args.fd.
};
// Input arguments for configuring the subprocess behavior.
@@ -153,10 +154,24 @@
OutputMode stdout_mode = kInherit;
OutputMode stderr_mode = kInherit;
+ base::ScopedFile out_fd;
+
// Returns " ".join(exec_cmd), quoting arguments.
std::string GetCmdString() const;
};
+ struct ResourceUsage {
+ uint32_t cpu_utime_ms = 0;
+ uint32_t cpu_stime_ms = 0;
+ uint32_t max_rss_kb = 0;
+ uint32_t min_page_faults = 0;
+ uint32_t maj_page_faults = 0;
+ uint32_t vol_ctx_switch = 0;
+ uint32_t invol_ctx_switch = 0;
+
+ uint32_t cpu_time_ms() const { return cpu_utime_ms + cpu_stime_ms; }
+ };
+
explicit Subprocess(std::initializer_list<std::string> exec_cmd = {});
Subprocess(Subprocess&&) noexcept;
Subprocess& operator=(Subprocess&&);
@@ -183,8 +198,8 @@
Status Poll();
- // Sends a SIGKILL and wait to see the process termination.
- void KillAndWaitForTermination();
+ // Sends a signal (SIGKILL if not specified) and wait for process termination.
+ void KillAndWaitForTermination(int sig_num = 0);
PlatformProcessId pid() const { return s_.pid; }
@@ -198,6 +213,7 @@
// This contains both stdout and stderr (if the corresponding _mode ==
// kBuffer). It's non-const so the caller can std::move() it.
std::string& output() { return s_.output; }
+ const ResourceUsage& rusage() const { return *s_.rusage; }
Args args;
@@ -222,6 +238,7 @@
int returncode = -1;
std::string output; // Stdin+stderr. Only when kBuffer.
std::thread waitpid_thread;
+ std::unique_ptr<ResourceUsage> rusage;
};
MovableState s_;
diff --git a/include/perfetto/ext/base/temp_file.h b/include/perfetto/ext/base/temp_file.h
index da7e1bf..3598868 100644
--- a/include/perfetto/ext/base/temp_file.h
+++ b/include/perfetto/ext/base/temp_file.h
@@ -24,6 +24,8 @@
namespace perfetto {
namespace base {
+std::string GetSysTempDir();
+
class TempFile {
public:
static TempFile CreateUnlinked();
diff --git a/protos/perfetto/config/BUILD.gn b/protos/perfetto/config/BUILD.gn
index 8a4a3b8..b17b961 100644
--- a/protos/perfetto/config/BUILD.gn
+++ b/protos/perfetto/config/BUILD.gn
@@ -37,6 +37,7 @@
sources = [
"chrome/chrome_config.proto",
"data_source_config.proto",
+ "stress_test_config.proto",
"test_config.proto",
"trace_config.proto",
]
diff --git a/protos/perfetto/config/stress_test_config.proto b/protos/perfetto/config/stress_test_config.proto
new file mode 100644
index 0000000..6ed8cbc
--- /dev/null
+++ b/protos/perfetto/config/stress_test_config.proto
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+import "protos/perfetto/config/trace_config.proto";
+
+package perfetto.protos;
+
+// This is the schema for the config files in /test/stress_test/configs/*.cfg.
+message StressTestConfig {
+ optional TraceConfig trace_config = 1;
+
+ // Shared Memory Buffer setup, passed as arguments to Tracing.Initialize().
+ optional uint32 shmem_size_kb = 2;
+ optional uint32 shmem_page_size_kb = 3;
+
+ // How many producer processes to spawn.
+ optional uint32 num_processes = 4;
+
+ // How many writer threads each producer process should spawn.
+ optional uint32 num_threads = 5;
+
+ // The producer will write events until one of the following is met:
+ // - trace_config.duration_ms is reached.
+ // - max_events is reached.
+ optional uint32 max_events = 6;
+
+ // If > 0 will write nested messages up to N levels deep. The size of each
+ // nested message depends on the payload_mean / sttdev arguments (below).
+ // This is to cover the patching logic.
+ optional uint32 nesting = 7;
+
+ // This submessage defines the timings of each writer worker thread.
+ message WriterTiming {
+ // The size of the payload written on each iteration.
+ optional double payload_mean = 1;
+ optional double payload_stddev = 2;
+
+ // The nominal event writing rate, expressed in events/sec.
+ // E.g. if payload_mean = 500 (bytes) and rate_mean = 1000 (Hz), each thread
+ // will write 500 KB / sec approximately (% stalling).
+ optional double rate_mean = 3;
+ optional double rate_stddev = 4;
+
+ // If non-zero each worker will slow down the writing of the payload:
+ // it writes half payload, sleep for payload_write_time_ms, then write the
+ // other half.
+ optional uint32 payload_write_time_ms = 5;
+ }
+
+ // The timings used by default.
+ optional WriterTiming steady_state_timings = 8;
+
+ // Optionally it is possible to cause a writer to enter "burst mode",
+ // simulating peaks of high-intensity writing. The way it works is the
+ // following: by default the writer writes events using the
+ // |steady_state_timings|. Then every |burst_period_ms| it will switch to the
+ // |burst_timings| for |burst_duration_ms|, and go back to the steady state
+ // after that (and then repeat).
+ optional uint32 burst_period_ms = 9;
+ optional uint32 burst_duration_ms = 10;
+ optional WriterTiming burst_timings = 11;
+}
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index af6af01..0be07f5 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -7044,6 +7044,21 @@
// The current value of the random number sequence used in tests.
optional uint32 seq_value = 2;
+
+ // Monotonically increased on each packet.
+ optional uint64 counter = 3;
+
+ // No more packets should follow (from the current sequence).
+ optional bool is_last = 4;
+
+ message TestPayload {
+ repeated string str = 1;
+ repeated TestPayload nested = 2;
+
+ // When 0 this is the bottom-most nested message.
+ optional uint32 remaining_nesting_depth = 3;
+ }
+ optional TestPayload payload = 5;
}
// End of protos/perfetto/trace/test_event.proto
diff --git a/protos/perfetto/trace/test_event.proto b/protos/perfetto/trace/test_event.proto
index 7a6902b..c55b1a5 100644
--- a/protos/perfetto/trace/test_event.proto
+++ b/protos/perfetto/trace/test_event.proto
@@ -25,4 +25,19 @@
// The current value of the random number sequence used in tests.
optional uint32 seq_value = 2;
+
+ // Monotonically increased on each packet.
+ optional uint64 counter = 3;
+
+ // No more packets should follow (from the current sequence).
+ optional bool is_last = 4;
+
+ message TestPayload {
+ repeated string str = 1;
+ repeated TestPayload nested = 2;
+
+ // When 0 this is the bottom-most nested message.
+ optional uint32 remaining_nesting_depth = 3;
+ }
+ optional TestPayload payload = 5;
}
diff --git a/src/base/subprocess.cc b/src/base/subprocess.cc
index 0e1fbbd..7c2ec01 100644
--- a/src/base/subprocess.cc
+++ b/src/base/subprocess.cc
@@ -21,6 +21,7 @@
#include <poll.h>
#include <signal.h>
#include <stdio.h>
+#include <sys/resource.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
@@ -105,6 +106,10 @@
if (dup2(args->stdouterr_pipe_wr, STDOUT_FILENO) == -1)
die("Failed to dup2(STDOUT)");
break;
+ case Subprocess::kFd:
+ if (dup2(*args->create_args->out_fd, STDOUT_FILENO) == -1)
+ die("Failed to dup2(STDOUT)");
+ break;
}
switch (args->create_args->stderr_mode) {
@@ -119,6 +124,10 @@
if (dup2(args->stdouterr_pipe_wr, STDERR_FILENO) == -1)
die("Failed to dup2(STDERR)");
break;
+ case Subprocess::kFd:
+ if (dup2(*args->create_args->out_fd, STDERR_FILENO) == -1)
+ die("Failed to dup2(STDERR)");
+ break;
}
// Close all FDs % stdin/out/err and the ones that the client explicitly
@@ -173,7 +182,9 @@
Subprocess::Args::Args(Args&&) noexcept = default;
Subprocess::Args& Subprocess::Args::operator=(Args&&) = default;
-Subprocess::Subprocess(std::initializer_list<std::string> a) : args(a) {}
+Subprocess::Subprocess(std::initializer_list<std::string> a) : args(a) {
+ s_.rusage.reset(new ResourceUsage());
+}
Subprocess::Subprocess(Subprocess&& other) noexcept {
static_assert(sizeof(Subprocess) == sizeof(std::tuple<MovableState, Args>),
@@ -257,10 +268,24 @@
// Both ends of the pipe are closed after the thread.join().
int pid = s_.pid;
int exit_status_pipe_wr = s_.exit_status_pipe.wr.release();
- s_.waitpid_thread = std::thread([pid, exit_status_pipe_wr] {
+ auto* rusage = s_.rusage.get();
+ s_.waitpid_thread = std::thread([pid, exit_status_pipe_wr, rusage] {
int pid_stat = -1;
- int wait_res = PERFETTO_EINTR(waitpid(pid, &pid_stat, 0));
+ struct rusage usg {};
+ int wait_res = PERFETTO_EINTR(wait4(pid, &pid_stat, 0, &usg));
PERFETTO_CHECK(wait_res == pid);
+
+ auto tv_to_ms = [](const struct timeval& tv) {
+ return static_cast<uint32_t>(tv.tv_sec * 1000 + tv.tv_usec / 1000);
+ };
+ rusage->cpu_utime_ms = tv_to_ms(usg.ru_utime);
+ rusage->cpu_stime_ms = tv_to_ms(usg.ru_stime);
+ rusage->max_rss_kb = static_cast<uint32_t>(usg.ru_maxrss) / 1000;
+ rusage->min_page_faults = static_cast<uint32_t>(usg.ru_minflt);
+ rusage->maj_page_faults = static_cast<uint32_t>(usg.ru_majflt);
+ rusage->vol_ctx_switch = static_cast<uint32_t>(usg.ru_nvcsw);
+ rusage->invol_ctx_switch = static_cast<uint32_t>(usg.ru_nivcsw);
+
base::ignore_result(PERFETTO_EINTR(
write(exit_status_pipe_wr, &pid_stat, sizeof(pid_stat))));
PERFETTO_CHECK(close(exit_status_pipe_wr) == 0 || errno == EINTR);
@@ -430,8 +455,8 @@
}
}
-void Subprocess::KillAndWaitForTermination() {
- kill(s_.pid, SIGKILL);
+void Subprocess::KillAndWaitForTermination(int sig_num) {
+ kill(s_.pid, sig_num ? sig_num : SIGKILL);
Wait();
}
diff --git a/src/base/temp_file.cc b/src/base/temp_file.cc
index 59a5089..9df8a3a 100644
--- a/src/base/temp_file.cc
+++ b/src/base/temp_file.cc
@@ -22,27 +22,26 @@
#include <stdlib.h>
#include <unistd.h>
+#include "perfetto/ext/base/string_utils.h"
+
namespace perfetto {
namespace base {
-namespace {
+std::string GetSysTempDir() {
+ const char* tmpdir = getenv("TMPDIR");
+ if (tmpdir)
+ return base::StripSuffix(tmpdir, "/");
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
-constexpr char kSysTmpPath[] = "/data/local/tmp";
+ return "/data/local/tmp";
#else
-constexpr char kSysTmpPath[] = "/tmp";
+ return "/tmp";
#endif
-} // namespace
+}
// static
TempFile TempFile::Create() {
TempFile temp_file;
- const char* tmpdir = getenv("TMPDIR");
- if (tmpdir) {
- temp_file.path_.assign(tmpdir);
- } else {
- temp_file.path_.assign(kSysTmpPath);
- }
- temp_file.path_.append("/perfetto-XXXXXXXX");
+ temp_file.path_ = GetSysTempDir() + "/perfetto-XXXXXXXX";
temp_file.fd_.reset(mkstemp(&temp_file.path_[0]));
if (PERFETTO_UNLIKELY(!temp_file.fd_)) {
PERFETTO_FATAL("Could not create temp file %s", temp_file.path_.c_str());
@@ -81,8 +80,7 @@
// static
TempDir TempDir::Create() {
TempDir temp_dir;
- temp_dir.path_.assign(kSysTmpPath);
- temp_dir.path_.append("/perfetto-XXXXXXXX");
+ temp_dir.path_ = GetSysTempDir() + "/perfetto-XXXXXXXX";
PERFETTO_CHECK(mkdtemp(&temp_dir.path_[0]));
return temp_dir;
}
@@ -96,5 +94,4 @@
} // namespace base
} // namespace perfetto
-
#endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
diff --git a/test/stress_test/BUILD.gn b/test/stress_test/BUILD.gn
new file mode 100644
index 0000000..a5fa275
--- /dev/null
+++ b/test/stress_test/BUILD.gn
@@ -0,0 +1,47 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../gn/perfetto.gni")
+
+executable("stress_test") {
+ testonly = true
+ sources = [ "stress_test.cc" ]
+ data_deps = [
+ "../../src/perfetto_cmd:perfetto",
+ "../../src/traced/service:traced",
+ ]
+ deps = [
+ ":stress_producer",
+ "../..:libperfetto_client_experimental",
+ "../../gn:default_deps",
+ "../../include/perfetto/tracing",
+ "../../protos/perfetto/trace:zero",
+ "../../src/base",
+ "../../src/base:test_support",
+ "configs",
+ ]
+}
+
+executable("stress_producer") {
+ testonly = true
+ sources = [ "stress_producer.cc" ]
+ deps = [
+ "../..:libperfetto_client_experimental",
+ "../../gn:default_deps",
+ "../../include/perfetto/tracing",
+ "../../protos/perfetto/trace:zero",
+ "../../src/base",
+ "../../src/base:test_support",
+ ]
+}
diff --git a/test/stress_test/README.md b/test/stress_test/README.md
new file mode 100644
index 0000000..70ee1c2
--- /dev/null
+++ b/test/stress_test/README.md
@@ -0,0 +1,134 @@
+# Perfetto Stress Test
+
+This is a test harness that to stress test the client library (DataSource-level
+only for now).
+
+The test is based on a number of configs in /test/stress_test/configs/*.cfg
+(NOTE: they must be listed in configs/BUILD.gn).
+The config is a /protos/perfetto/config/stress_test_config.proto message, which
+embeds the configuration of the test and a whole trace config.
+
+Each configs defines a testing scenario, determining the general trace config
+and all the settings of the test (e.g., how many producer processes to spawn,
+the write timings).
+
+The test is based on exec()-ing `traced` (the tracing service), `perfetto` (the
+consumer cmdline client) and a variable number of `stress_producer` instances.
+
+`stress_producer` emits events at a configurable rate, writing predictable
+sequences of numbers / string, so that the test harness can easily detect
+corruptions, out-of-order events or gaps.
+
+After running each test, the `stress_test` binary reads back the trace and
+performs a bunch of checks:
+
+- Checks that the number of sequences is exactly equal to #processes x #threads.
+- Checks that each sequence has all the expected packets in the right sequence
+- Checks the payload and correctness of proto nesting of each trace packet.
+- Reports CPU/Memory/Context-switch numbers for the service and producer
+ processes.
+
+Each test config is isolated from the others. All processes are killed and
+re-spawned for each test.
+
+The stdout/err of each process is saved in a dedicated /tmp/ folder, as well as
+the resulting trace.
+
+## Building and running the test
+
+```bash
+# This will recursively build traced, perfetto and stress_producer.
+ninja -C out/default stress_test
+
+out/default/stress_test
+```
+
+will output:
+
+```txt
+[307.909] stress_test.cc:116 Saving test results in /tmp/perfetto-ltIBJgA0
+
+===============================================================
+Config: simple
+===============================================================
+Metric Expected Actual
+------ -------- ------
+#Errors 0 0
+Duration [ms] 3000 3109
+Num threads 1 1
+Num packets 1000 1001
+Trace size [KB] 168 170
+Svc RSS [MB] 4 2
+Prod RSS [MB] --- 1
+Svc CPU [ms] --- 10
+Prod CPU [ms] --- 32
+Svc #ctxswitch --- 103 / 20
+Prod #ctxswitch --- 1022 / 1
+
+===============================================================
+Config: bursts
+===============================================================
+Metric Expected Actual
+------ -------- ------
+#Errors 0 0
+Duration [ms] 2000 2381
+Num threads 10 10
+Num packets 2675 20021
+Trace size [KB] 449 11063
+Svc RSS [MB] 32 17
+Prod RSS [MB] --- 1
+Svc CPU [ms] --- 98
+Prod CPU [ms] --- 17
+Svc #ctxswitch --- 704 / 1327
+Prod #ctxswitch --- 421 / 1
+```
+
+```bash
+$ ls -Rlh /tmp/perfetto-ltIBJgA0
+total 0
+drwxr-xr-x 16 primiano wheel 512B 5 Aug 09:16 bursts
+drwxr-xr-x 9 primiano wheel 288B 5 Aug 09:16 simple
+drwxr-xr-x 38 primiano wheel 1.2K 5 Aug 09:16 the_storm
+
+/tmp/perfetto-ltIBJgA0/bursts:
+total 22752
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 errors.log
+-rw-r--r-- 1 primiano wheel 180B 5 Aug 09:16 perfetto.log
+-rw-r--r-- 1 primiano wheel 441B 5 Aug 09:16 producer.0.log
+...
+-rw-r--r-- 1 primiano wheel 441B 5 Aug 09:16 producer.9.log
+-rw------- 1 primiano wheel 11M 5 Aug 09:16 trace
+-rw-r--r-- 1 primiano wheel 407B 5 Aug 09:16 traced.log
+
+/tmp/perfetto-ltIBJgA0/simple:
+total 400
+srwxr-xr-x 1 primiano wheel 0B 5 Aug 09:16 consumer.sock
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 errors.log
+-rw-r--r-- 1 primiano wheel 178B 5 Aug 09:16 perfetto.log
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 producer.0.log
+srwxr-xr-x 1 primiano wheel 0B 5 Aug 09:16 producer.sock
+-rw------- 1 primiano wheel 167K 5 Aug 09:16 trace
+-rw-r--r-- 1 primiano wheel 406B 5 Aug 09:16 traced.log
+
+/tmp/perfetto-ltIBJgA0/the_storm:
+total 524432
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 errors.log
+-rw-r--r-- 1 primiano wheel 184B 5 Aug 09:16 perfetto.log
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 producer.0.log
+...
+-rw-r--r-- 1 primiano wheel 0B 5 Aug 09:16 producer.127.log
+-rw------- 1 primiano wheel 248M 5 Aug 09:16 trace
+-rw-r--r-- 1 primiano wheel 408B 5 Aug 09:16 traced.log
+```
+
+## TODOs
+
+The following scenarios requires more coverage:
+
+- Nested messages.
+- Force losses and check that the last_dropped flag is consistent.
+- Flushes and scraping.
+- Report data losses in the test output.
+- Multibuffer scenarios.
+- write_into_file=true.
+- Vary page size, smb size.
diff --git a/test/stress_test/configs/BUILD.gn b/test/stress_test/configs/BUILD.gn
new file mode 100644
index 0000000..6a12a09
--- /dev/null
+++ b/test/stress_test/configs/BUILD.gn
@@ -0,0 +1,50 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../gn/perfetto.gni")
+
+config("include_path") {
+ include_dirs = [ target_gen_dir ]
+}
+
+action("configs") {
+ testonly = true
+
+ sources = [
+ "backfills.cfg",
+ "bursts.cfg",
+ "heavy.cfg",
+ "simple.cfg",
+ "stalls.cfg",
+ "xxl_packets.cfg",
+ ]
+
+ protoc_target = "../../../gn:protoc($host_toolchain)"
+ protoc_out_dir = get_label_info(protoc_target, "root_out_dir")
+ protoc_rel_dir = rebase_path(protoc_out_dir, root_build_dir)
+ out_header = "$target_gen_dir/stress_test_config_blobs.h"
+ out_header_rel = rebase_path(out_header, root_build_dir)
+
+ deps = [ protoc_target ]
+ script = "../gen_configs_blob.py"
+ outputs = [ out_header ]
+ args = [
+ "--protoc=$protoc_rel_dir/protoc",
+ "--out=$out_header_rel",
+ ]
+ foreach(source, sources) {
+ args += [ rebase_path(source, root_build_dir) ]
+ }
+ public_configs = [ ":include_path" ]
+}
diff --git a/test/stress_test/configs/backfills.cfg b/test/stress_test/configs/backfills.cfg
new file mode 100644
index 0000000..24e1b17
--- /dev/null
+++ b/test/stress_test/configs/backfills.cfg
@@ -0,0 +1,24 @@
+# TODO(primiano): this fails with the errors below, investigate.
+# FAIL: TestEvent counter mismatch for sequence 2. Expected 100 got 99
+# FAIL: TestEvent counter mismatch for sequence 3. Expected 99 got 98
+# FAIL: TestEvent counter mismatch for sequence 4. Expected 106 got 105
+# FAIL: TestEvent counter mismatch for sequence 5. Expected 107 got 106
+# FAIL: TestEvent counter mismatch for sequence 6. Expected 102 got 101
+# FAIL: TestEvent counter mismatch for sequence 7. Expected 104 got 103
+# FAIL: TestEvent counter mismatch for sequence 8. Expected 111 got 110
+# FAIL: TestEvent counter mismatch for sequence 9. Expected 109 got 108
+
+num_processes: 1
+num_threads: 8
+
+steady_state_timings {
+ rate_mean: 100
+ payload_mean: 640
+ payload_write_time_ms: 100
+}
+
+trace_config {
+ duration_ms: 10000
+ buffers { size_kb: 500000 }
+ data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/bursts.cfg b/test/stress_test/configs/bursts.cfg
new file mode 100644
index 0000000..c8a0fdf
--- /dev/null
+++ b/test/stress_test/configs/bursts.cfg
@@ -0,0 +1,22 @@
+num_processes: 10
+num_threads: 1
+
+steady_state_timings {
+ rate_mean: 10
+ payload_mean: 128
+}
+
+# 250ms every 2s enter burst mode, bumping at 1000 events/s * 512 ~= 5 MB/s
+# (per thread) ~= 50 MB/s for the 10 processes.
+burst_period_ms: 2000
+burst_duration_ms: 250
+burst_timings {
+ rate_mean: 1000
+ payload_mean: 512
+}
+
+trace_config {
+ duration_ms: 2000
+ buffers { size_kb: 20000 }
+ data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/heavy.cfg b/test/stress_test/configs/heavy.cfg
new file mode 100644
index 0000000..c137d5c
--- /dev/null
+++ b/test/stress_test/configs/heavy.cfg
@@ -0,0 +1,27 @@
+num_processes: 32
+num_threads: 10
+nesting: 10
+
+steady_state_timings {
+ rate_mean: 10
+ payload_mean: 128
+}
+
+burst_period_ms: 2000
+burst_duration_ms: 250
+burst_timings {
+ rate_mean: 1000
+ payload_mean: 128
+}
+
+trace_config {
+ duration_ms: 5000
+ buffers { size_kb: 320000 }
+ data_sources { config { name: "perfetto.stress_test" } }
+
+ producers {
+ producer_name: "stress_producer"
+ shm_size_kb: 4000
+ page_size_kb: 4
+ }
+}
diff --git a/test/stress_test/configs/simple.cfg b/test/stress_test/configs/simple.cfg
new file mode 100644
index 0000000..4632574
--- /dev/null
+++ b/test/stress_test/configs/simple.cfg
@@ -0,0 +1,16 @@
+num_processes: 1
+num_threads: 1
+max_events: 1000
+nesting: 8
+
+# 500 events/s, ~128 bytes/event ~= 64 KB/s
+steady_state_timings {
+ rate_mean: 500
+ payload_mean: 128
+}
+
+trace_config {
+ duration_ms: 3000
+ buffers { size_kb: 8000 }
+ data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/stalls.cfg b/test/stress_test/configs/stalls.cfg
new file mode 100644
index 0000000..344194f
--- /dev/null
+++ b/test/stress_test/configs/stalls.cfg
@@ -0,0 +1,15 @@
+num_processes: 8
+num_threads: 4
+max_events: 10000
+
+# 1K events/s * 10K = 10 MB/s per thread
+steady_state_timings {
+ rate_mean: 1000
+ payload_mean: 10000
+}
+
+trace_config {
+ duration_ms: 5000
+ buffers { size_kb: 500000 }
+ data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/configs/xxl_packets.cfg b/test/stress_test/configs/xxl_packets.cfg
new file mode 100644
index 0000000..fbafb2a
--- /dev/null
+++ b/test/stress_test/configs/xxl_packets.cfg
@@ -0,0 +1,22 @@
+# Four threads writing large and nested packets of 32MB each every second.
+
+num_processes: 1
+num_threads: 4
+max_events: 5
+nesting: 2
+
+# Each writer will write packets of 16 MB ((1 + nesting=1) x payload 8MB)
+steady_state_timings {
+ rate_mean: 1
+ payload_mean: 8000000
+ payload_write_time_ms: 100
+}
+
+trace_config {
+ duration_ms: 10000
+ buffers {
+ size_kb: 500000
+ fill_policy: DISCARD
+ }
+ data_sources { config { name: "perfetto.stress_test" } }
+}
diff --git a/test/stress_test/gen_configs_blob.py b/test/stress_test/gen_configs_blob.py
new file mode 100644
index 0000000..ba1f2c7
--- /dev/null
+++ b/test/stress_test/gen_configs_blob.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python3
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Compiles the stress_test configs protos and bundles in a .h C++ array.
+
+This scripts takes all the configs in /test/stress_test/configs, compiles them
+with protoc and generates a C++ header which contains the configs' names and
+proto-encoded bytes.
+
+This is invoked by the build system and is used by the stress_test runner. The
+goal is making the stress_test binary hermetic and not depend on the repo.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+import os
+import sys
+import argparse
+import shutil
+import subprocess
+
+CUR_DIR = os.path.dirname(os.path.realpath(__file__))
+ROOT_DIR = os.path.dirname(os.path.dirname(CUR_DIR))
+CONFIGS_DIR = os.path.join(CUR_DIR, 'configs')
+
+
+def find_protoc():
+ for root, _, files in os.walk(os.path.join(ROOT_DIR, 'out')):
+ if 'protoc' in files:
+ return os.path.join(root, 'protoc')
+ return None
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--protoc')
+ parser.add_argument('--out', required=True)
+ parser.add_argument('cfgfiles', nargs='+')
+ args = parser.parse_args()
+
+ protoc = args.protoc or find_protoc()
+ assert protoc, 'protoc not found, pass --protoc /path/to/protoc'
+ assert os.path.exists(protoc), '{} does not exist'.format(protoc)
+ if protoc is not args.protoc:
+ print('Using protoc: {}'.format(protoc))
+
+ blobs = {}
+ for cfg_path in args.cfgfiles:
+ cfg_name = os.path.splitext(cfg_path)[0].split(os.sep)[-1]
+ with open(cfg_path, 'r') as in_file:
+ compiled_proto = subprocess.check_output([
+ protoc,
+ '--encode=perfetto.protos.StressTestConfig',
+ '--proto_path=' + ROOT_DIR,
+ os.path.join(ROOT_DIR, 'protos', 'perfetto', 'config',
+ 'stress_test_config.proto'),
+ ],
+ stdin=in_file)
+ blobs[cfg_name] = bytearray(compiled_proto)
+
+ # Write the C++ header file
+ fout = open(args.out, 'wb')
+ include_guard = args.out.replace('/', '_').replace('.', '_').upper() + '_'
+ fout.write("""
+#ifndef {include_guard}
+#define {include_guard}
+
+#include <stddef.h>
+#include <stdint.h>
+
+// This file was autogenerated by ${gen_script}. Do not edit.
+
+namespace perfetto {{
+namespace {{
+
+struct StressTestConfigBlob {{
+ const char* name;
+ const uint8_t* data;
+ size_t size;
+}};\n\n""".format(
+ gen_script=__file__,
+ include_guard=include_guard,
+ ).encode())
+
+ configs_arr = '\nconst StressTestConfigBlob kStressTestConfigs[] = {\n'
+ for cfg_name, blob in blobs.items():
+ arr_str = ','.join(str(b) for b in blob)
+ line = 'const uint8_t _config_%s[]{%s};\n' % (cfg_name, arr_str)
+ fout.write(line.encode())
+ configs_arr += ' {{"{n}", _config_{n}, sizeof(_config_{n})}},\n'.format(
+ n=cfg_name)
+ configs_arr += '};\n'
+ fout.write(configs_arr.encode())
+ fout.write("""
+} // namespace
+} // namespace perfetto
+#endif\n""".encode())
+ fout.close()
+
+
+if __name__ == '__main__':
+ exit(main())
diff --git a/test/stress_test/stress_producer.cc b/test/stress_test/stress_producer.cc
new file mode 100644
index 0000000..2845efc
--- /dev/null
+++ b/test/stress_test/stress_producer.cc
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <math.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <list>
+#include <random>
+#include <thread>
+
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/tracing.h"
+
+#include "protos/perfetto/config/stress_test_config.gen.h"
+#include "protos/perfetto/trace/test_event.pbzero.h"
+
+using StressTestConfig = perfetto::protos::gen::StressTestConfig;
+
+namespace perfetto {
+namespace {
+
+StressTestConfig* g_cfg;
+
+class StressTestDataSource : public DataSource<StressTestDataSource> {
+ public:
+ constexpr static BufferExhaustedPolicy kBufferExhaustedPolicy =
+ BufferExhaustedPolicy::kStall;
+
+ void OnSetup(const SetupArgs& args) override;
+ void OnStart(const StartArgs&) override;
+ void OnStop(const StopArgs&) override;
+
+ private:
+ class Worker {
+ public:
+ explicit Worker(uint32_t id) : id_(id) {}
+ void Start();
+ void Stop();
+ ~Worker() { Stop(); }
+
+ private:
+ void WorkerMain(uint32_t worker_id);
+ void FillPayload(const StressTestConfig::WriterTiming&,
+ uint32_t seq,
+ uint32_t nesting,
+ protos::pbzero::TestEvent::TestPayload*);
+
+ const uint32_t id_;
+ std::thread thread_;
+ std::atomic<bool> quit_;
+ std::minstd_rand0 rnd_seq_;
+
+ // Use a different engine for the generation of random value, keep rnd_seq_
+ // dedicated to generating deterministic sequences.
+ std::minstd_rand0 rnd_gen_;
+ };
+
+ std::list<Worker> workers_;
+};
+
+// Called before the tracing session starts.
+void StressTestDataSource::OnSetup(const SetupArgs&) {
+ for (uint32_t i = 0; i < std::max(g_cfg->num_threads(), 1u); ++i)
+ workers_.emplace_back(i);
+}
+
+// Called when the tracing session starts.
+void StressTestDataSource::OnStart(const StartArgs&) {
+ for (auto& worker : workers_)
+ worker.Start();
+}
+
+// Called when the tracing session ends.
+void StressTestDataSource::OnStop(const StopArgs&) {
+ for (auto& worker : workers_)
+ worker.Stop();
+ workers_.clear();
+}
+
+void StressTestDataSource::Worker::Start() {
+ quit_.store(false);
+ thread_ = std::thread(&StressTestDataSource::Worker::WorkerMain, this, id_);
+}
+
+void StressTestDataSource::Worker::Stop() {
+ if (!thread_.joinable() || quit_)
+ return;
+ PERFETTO_DLOG("Stopping worker %u", id_);
+ quit_.store(true);
+ thread_.join();
+}
+
+void StressTestDataSource::Worker::WorkerMain(uint32_t worker_id) {
+ PERFETTO_DLOG("Worker %u starting", worker_id);
+ rnd_seq_ = std::minstd_rand0(0);
+ int64_t t_start = base::GetBootTimeNs().count();
+ int64_t num_msgs = 0;
+
+ const int64_t max_msgs = g_cfg->max_events()
+ ? static_cast<int64_t>(g_cfg->max_events())
+ : INT64_MAX;
+ bool is_last = false;
+ while (!is_last) {
+ is_last = quit_ || ++num_msgs >= max_msgs;
+
+ const int64_t now = base::GetBootTimeNs().count();
+ const auto elapsed_ms = static_cast<uint64_t>((now - t_start) / 1000000);
+
+ const auto* timings = &g_cfg->steady_state_timings();
+ if (g_cfg->burst_period_ms() &&
+ elapsed_ms % g_cfg->burst_period_ms() >
+ (g_cfg->burst_period_ms() - g_cfg->burst_duration_ms())) {
+ timings = &g_cfg->burst_timings();
+ }
+ std::normal_distribution<> rate_dist{timings->rate_mean(),
+ timings->rate_stddev()};
+
+ double period_ns = 1e9 / rate_dist(rnd_gen_);
+ period_ns = isnan(period_ns) || period_ns == 0.0 ? 1 : period_ns;
+ double expected_msgs = static_cast<double>(now - t_start) / period_ns;
+ int64_t delay_ns = 0;
+ if (static_cast<int64_t>(expected_msgs) < num_msgs)
+ delay_ns = static_cast<int64_t>(period_ns);
+ std::this_thread::sleep_for(
+ std::chrono::nanoseconds(static_cast<int64_t>(delay_ns)));
+
+ StressTestDataSource::Trace([&](StressTestDataSource::TraceContext ctx) {
+ const uint32_t seq = static_cast<uint32_t>(rnd_seq_());
+ auto packet = ctx.NewTracePacket();
+ packet->set_timestamp(static_cast<uint64_t>(now));
+ auto* test_event = packet->set_for_testing();
+ test_event->set_seq_value(seq);
+ test_event->set_counter(static_cast<uint64_t>(num_msgs));
+ if (is_last)
+ test_event->set_is_last(true);
+
+ FillPayload(*timings, seq, g_cfg->nesting(), test_event->set_payload());
+ }); // Trace().
+
+ } // while (!quit)
+ PERFETTO_DLOG("Worker done");
+}
+
+void StressTestDataSource::Worker::FillPayload(
+ const StressTestConfig::WriterTiming& timings,
+ uint32_t seq,
+ uint32_t nesting,
+ protos::pbzero::TestEvent::TestPayload* payload) {
+ // Write the payload in two halves, optionally with some delay in the
+ // middle.
+ std::normal_distribution<> msg_size_dist{timings.payload_mean(),
+ timings.payload_stddev()};
+ auto payload_size =
+ static_cast<uint32_t>(std::max(std::round(msg_size_dist(rnd_gen_)), 0.0));
+ std::string buf;
+ buf.resize(payload_size / 2);
+ for (size_t i = 0; i < buf.size(); ++i) {
+ buf[i] = static_cast<char>(33 + ((seq + i) % 64)); // Stay ASCII.
+ }
+ payload->add_str(buf);
+ payload->set_remaining_nesting_depth(nesting);
+ if (timings.payload_write_time_ms() > 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(timings.payload_write_time_ms()));
+ }
+
+ if (nesting > 0)
+ FillPayload(timings, seq, nesting - 1, payload->add_nested());
+
+ payload->add_str(buf);
+}
+} // namespace
+
+PERFETTO_DEFINE_DATA_SOURCE_STATIC_MEMBERS(StressTestDataSource);
+
+} // namespace perfetto
+
+int main() {
+ perfetto::TracingInitArgs args;
+ args.backends = perfetto::kSystemBackend;
+
+ std::string config_blob;
+ if (isatty(STDIN_FILENO))
+ PERFETTO_LOG("Reading StressTestConfig proto from stdin");
+ perfetto::base::ReadFileStream(stdin, &config_blob);
+
+ StressTestConfig cfg;
+ perfetto::g_cfg = &cfg;
+ if (config_blob.empty() || !cfg.ParseFromString(config_blob))
+ PERFETTO_FATAL("A StressTestConfig blob must be passed into stdin");
+
+ if (cfg.shmem_page_size_kb())
+ args.shmem_page_size_hint_kb = cfg.shmem_page_size_kb();
+ if (cfg.shmem_size_kb())
+ args.shmem_page_size_hint_kb = cfg.shmem_size_kb();
+
+ perfetto::Tracing::Initialize(args);
+ perfetto::DataSourceDescriptor dsd;
+ dsd.set_name("perfetto.stress_test");
+ perfetto::StressTestDataSource::Register(dsd);
+
+ for (;;) {
+ pause();
+ }
+}
diff --git a/test/stress_test/stress_test.cc b/test/stress_test/stress_test.cc
new file mode 100644
index 0000000..d66039e
--- /dev/null
+++ b/test/stress_test/stress_test.cc
@@ -0,0 +1,497 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#include <stdarg.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+#include <chrono>
+#include <list>
+#include <map>
+#include <random>
+#include <regex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/scoped_file.h"
+#include "perfetto/ext/base/subprocess.h"
+#include "perfetto/ext/base/temp_file.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/tracing.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "src/base/test/utils.h"
+
+#include "protos/perfetto/config/stress_test_config.gen.h"
+#include "protos/perfetto/trace/test_event.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+// Generated by gen_configs_blob.py. It defines the kStressTestConfigs array,
+// which contains a proto-encoded StressTestConfig message for each .cfg file
+// listed in /test/stress_test/configs/BUILD.gn.
+#include "test/stress_test/configs/stress_test_config_blobs.h"
+
+namespace perfetto {
+namespace {
+
+using StressTestConfig = protos::gen::StressTestConfig;
+
+struct SigHandlerCtx {
+ std::atomic<bool> aborted{};
+ std::vector<int> pids_to_kill;
+};
+SigHandlerCtx* g_sig;
+
+struct TestResult {
+ const char* cfg_name = nullptr;
+ StressTestConfig cfg;
+ uint32_t run_time_ms = 0;
+ uint32_t trace_size_kb = 0;
+ uint32_t num_packets = 0;
+ uint32_t num_threads = 0;
+ uint32_t num_errors = 0;
+ base::Subprocess::ResourceUsage svc_rusage;
+ base::Subprocess::ResourceUsage prod_rusage;
+};
+
+struct ParsedTraceStats {
+ struct WriterThread {
+ uint64_t packets_seen = 0;
+ bool last_seen = false;
+ uint32_t last_seq = 0;
+ uint64_t seq_errors = 0;
+ uint64_t counter_errors = 0;
+ std::minstd_rand0 rnd_engine;
+ };
+
+ // One for each trusted_packet_sequence_id.
+ std::map<uint32_t, WriterThread> threads;
+};
+
+class TestHarness {
+ public:
+ TestHarness();
+ void RunConfig(const char* cfg_name, const StressTestConfig&, bool verbose);
+ const std::list<TestResult>& test_results() const { return test_results_; }
+
+ private:
+ void ReadbackTrace(const std::string&, ParsedTraceStats*);
+ void ParseTracePacket(const uint8_t*, size_t, ParsedTraceStats* ctx);
+ void AddFailure(const char* fmt, ...) PERFETTO_PRINTF_FORMAT(2, 3);
+
+ std::vector<std::string> env_;
+ std::list<TestResult> test_results_;
+ std::string results_dir_;
+ base::ScopedFile error_log_;
+};
+
+TestHarness::TestHarness() {
+ results_dir_ = base::GetSysTempDir() + "/perfetto-stress-test";
+ system(("rm -r -- \"" + results_dir_ + "\"").c_str());
+ PERFETTO_CHECK(mkdir(results_dir_.c_str(), 0755) == 0);
+ PERFETTO_LOG("Saving test results in %s", results_dir_.c_str());
+}
+
+void TestHarness::AddFailure(const char* fmt, ...) {
+ ++test_results_.back().num_errors;
+
+ char log_msg[512];
+ va_list args;
+ va_start(args, fmt);
+ int res = vsnprintf(log_msg, sizeof(log_msg), fmt, args);
+ va_end(args);
+
+ PERFETTO_ELOG("FAIL: %s", log_msg);
+
+ if (res > 0 && static_cast<size_t>(res) < sizeof(log_msg) - 2) {
+ log_msg[res++] = '\n';
+ log_msg[res++] = '\0';
+ }
+ base::ignore_result(write(*error_log_, log_msg, static_cast<size_t>(res)));
+}
+
+void TestHarness::RunConfig(const char* cfg_name,
+ const StressTestConfig& cfg,
+ bool verbose) {
+ test_results_.emplace_back();
+ TestResult& test_result = test_results_.back();
+ test_result.cfg_name = cfg_name;
+ test_result.cfg = cfg;
+ g_sig->pids_to_kill.clear();
+
+ auto result_dir = results_dir_ + "/" + cfg_name;
+ PERFETTO_CHECK(!mkdir(result_dir.c_str(), 0755));
+ error_log_ = base::OpenFile(result_dir + "/errors.log",
+ O_RDWR | O_CREAT | O_TRUNC, 0644);
+
+ PERFETTO_ILOG("Starting \"%s\" - %s", cfg_name, result_dir.c_str());
+
+ env_.emplace_back("PERFETTO_PRODUCER_SOCK_NAME=" + result_dir +
+ "/producer.sock");
+ env_.emplace_back("PERFETTO_CONSUMER_SOCK_NAME=" + result_dir +
+ "/consumer.sock");
+ std::string bin_dir = base::GetCurExecutableDir();
+
+ // Start the service.
+ base::Subprocess traced({bin_dir + "/traced"});
+ traced.args.env = env_;
+ if (!verbose) {
+ traced.args.out_fd = base::OpenFile(result_dir + "/traced.log",
+ O_RDWR | O_CREAT | O_TRUNC, 0644);
+ traced.args.stderr_mode = traced.args.stdout_mode = base::Subprocess::kFd;
+ }
+ traced.Start();
+ g_sig->pids_to_kill.emplace_back(traced.pid());
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ PERFETTO_CHECK(traced.Poll() == base::Subprocess::kRunning);
+
+ // Start the producer processes.
+ std::list<base::Subprocess> producers;
+ for (uint32_t i = 0; i < cfg.num_processes(); ++i) {
+ producers.emplace_back(base::Subprocess({bin_dir + "/stress_producer"}));
+ auto& producer = producers.back();
+ producer.args.input = cfg.SerializeAsString();
+ if (!verbose) {
+ producer.args.out_fd =
+ base::OpenFile(result_dir + "/producer." + std::to_string(i) + ".log",
+ O_RDWR | O_CREAT | O_TRUNC, 0644);
+ producer.args.stderr_mode = producer.args.stdout_mode =
+ base::Subprocess::kFd;
+ }
+ producer.args.env = env_;
+ producer.Start();
+ g_sig->pids_to_kill.emplace_back(producer.pid());
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ for (auto& producer : producers)
+ PERFETTO_CHECK(producer.Poll() == base::Subprocess::kRunning);
+
+ auto trace_file_path = result_dir + "/trace";
+ base::Subprocess consumer(
+ {bin_dir + "/perfetto", "-c", "-", "-o", trace_file_path.c_str()});
+ consumer.args.env = env_;
+ consumer.args.input = cfg.trace_config().SerializeAsString();
+ if (!verbose) {
+ consumer.args.out_fd = base::OpenFile(result_dir + "/perfetto.log",
+ O_RDWR | O_CREAT | O_TRUNC, 0644);
+ consumer.args.stderr_mode = consumer.args.stdout_mode =
+ base::Subprocess::kFd;
+ }
+ unlink(trace_file_path.c_str());
+ consumer.Start();
+ int64_t t_start = base::GetBootTimeNs().count();
+ g_sig->pids_to_kill.emplace_back(consumer.pid());
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ PERFETTO_CHECK(consumer.Poll() == base::Subprocess::kRunning);
+
+ if (!consumer.Wait(
+ static_cast<int>(cfg.trace_config().duration_ms() + 30000))) {
+ AddFailure("Consumer didn't quit in time");
+ consumer.KillAndWaitForTermination();
+ }
+
+ // Stop
+ consumer.KillAndWaitForTermination(SIGTERM);
+ int64_t t_end = base::GetBootTimeNs().count();
+
+ for (auto& producer : producers) {
+ producer.KillAndWaitForTermination();
+ test_result.prod_rusage = producer.rusage(); // Only keep last one
+ }
+ producers.clear();
+ traced.KillAndWaitForTermination();
+
+ test_result.svc_rusage = traced.rusage();
+ test_result.run_time_ms = static_cast<uint32_t>((t_end - t_start) / 1000000);
+
+ // Verify
+ // TODO(primiano): read back the TraceStats and check them as well.
+ ParsedTraceStats ctx;
+ ReadbackTrace(trace_file_path, &ctx);
+ auto exp_thd = cfg.num_processes() * cfg.num_threads();
+ if (ctx.threads.size() != exp_thd) {
+ AddFailure("Trace threads mismatch. Expected %u threads, got %zu", exp_thd,
+ ctx.threads.size());
+ }
+ for (const auto& it : ctx.threads) {
+ uint32_t seq_id = it.first;
+ const auto& thd = it.second;
+ if (!thd.last_seen) {
+ AddFailure("Last packet not seen for sequence %u", seq_id);
+ }
+ if (thd.seq_errors > 0) {
+ AddFailure("Sequence %u had %" PRIu64 " packets out of sync", seq_id,
+ thd.seq_errors);
+ }
+ if (thd.counter_errors > 0) {
+ AddFailure("Sequence %u had %" PRIu64 " packets counter errors", seq_id,
+ thd.counter_errors);
+ }
+ }
+
+ error_log_.reset();
+ PERFETTO_ILOG("Completed \"%s\"", cfg_name);
+}
+
+void TestHarness::ReadbackTrace(const std::string& trace_file_path,
+ ParsedTraceStats* ctx) {
+ TestResult& test_result = test_results_.back();
+ using namespace protozero::proto_utils;
+ auto fd = base::OpenFile(trace_file_path.c_str(), O_RDONLY);
+ if (!fd)
+ return AddFailure("Trace file does not exist");
+ const off_t file_size = lseek(*fd, 0, SEEK_END);
+ if (file_size <= 0)
+ return AddFailure("Trace file is empty");
+ test_result.trace_size_kb = static_cast<uint32_t>(file_size / 1000);
+ const uint8_t* const start = static_cast<const uint8_t*>(mmap(
+ nullptr, static_cast<size_t>(file_size), PROT_READ, MAP_PRIVATE, *fd, 0));
+ PERFETTO_CHECK(start != MAP_FAILED);
+ const uint8_t* const end = start + file_size;
+
+ constexpr uint8_t kTracePacketTag = MakeTagLengthDelimited(1);
+
+ for (auto* ptr = start; (end - ptr) > 2;) {
+ const uint8_t* tokenizer_start = ptr;
+ if (*(ptr++) != kTracePacketTag) {
+ return AddFailure("Tokenizer failure at offset %zd", ptr - start);
+ }
+ uint64_t packet_size = 0;
+ ptr = ParseVarInt(ptr, end, &packet_size);
+ const uint8_t* const packet_start = ptr;
+ ptr += packet_size;
+ if ((ptr - tokenizer_start) < 2 || ptr > end)
+ return AddFailure("Got invalid packet size %" PRIu64 " at offset %zd",
+ packet_size,
+ static_cast<ssize_t>(packet_start - start));
+ ParseTracePacket(packet_start, static_cast<size_t>(packet_size), ctx);
+ }
+ test_result.num_threads = static_cast<uint32_t>(ctx->threads.size());
+}
+
+void TestHarness::ParseTracePacket(const uint8_t* start,
+ size_t size,
+ ParsedTraceStats* ctx) {
+ TestResult& test_result = test_results_.back();
+ protos::pbzero::TracePacket::Decoder packet(start, size);
+ if (!packet.has_for_testing())
+ return;
+
+ ++test_result.num_packets;
+ const uint32_t seq_id = packet.trusted_packet_sequence_id();
+
+ protos::pbzero::TestEvent::Decoder te(packet.for_testing());
+ auto t_it = ctx->threads.find(seq_id);
+ bool is_first_packet = false;
+ if (t_it == ctx->threads.end()) {
+ is_first_packet = true;
+ t_it = ctx->threads.emplace(seq_id, ParsedTraceStats::WriterThread()).first;
+ }
+ ParsedTraceStats::WriterThread& thd = t_it->second;
+
+ ++thd.packets_seen;
+ if (te.is_last()) {
+ if (thd.last_seen) {
+ return AddFailure(
+ "last_seen=true happened more than once for sequence %u", seq_id);
+ } else {
+ thd.last_seen = true;
+ }
+ }
+ if (is_first_packet) {
+ thd.rnd_engine = std::minstd_rand0(te.seq_value());
+ } else {
+ const uint32_t expected = static_cast<uint32_t>(thd.rnd_engine());
+ if (te.seq_value() != expected) {
+ thd.rnd_engine = std::minstd_rand0(te.seq_value()); // Resync the engine.
+ ++thd.seq_errors;
+ return AddFailure(
+ "TestEvent seq mismatch for sequence %u. Expected %u got %u", seq_id,
+ expected, te.seq_value());
+ }
+ if (te.counter() != thd.packets_seen) {
+ return AddFailure(
+ "TestEvent counter mismatch for sequence %u. Expected %" PRIu64
+ " got %" PRIu64,
+ seq_id, thd.packets_seen, te.counter());
+ }
+ }
+
+ if (!te.has_payload()) {
+ return AddFailure("TestEvent %u for sequence %u has no payload",
+ te.seq_value(), seq_id);
+ }
+
+ // Check the validity of the payload. The payload might be nested. If that is
+ // the case, we need to check all levels.
+ protozero::ConstBytes payload_bounds = te.payload();
+ for (uint32_t depth = 0, last_depth = 0;; depth++) {
+ if (depth > 100) {
+ return AddFailure("Unexpectedly deep depth for event %u, sequence %u",
+ te.seq_value(), seq_id);
+ }
+ protos::pbzero::TestEvent::TestPayload::Decoder payload(payload_bounds);
+ const uint32_t rem_depth = payload.remaining_nesting_depth();
+
+ // The payload is a repeated field and must have exactly two instances.
+ // The writer splits it always in two halves of identical size.
+ int num_payload_pieces = 0;
+ size_t last_size = 0;
+ for (auto it = payload.str(); it; ++it, ++num_payload_pieces) {
+ protozero::ConstChars payload_str = *it;
+ last_size = last_size ? last_size : payload_str.size;
+ if (payload_str.size != last_size) {
+ return AddFailure(
+ "Asymmetrical payload at depth %u, event id %u, sequence %u. "
+ "%zu != %zu",
+ depth, te.seq_value(), seq_id, last_size, payload_str.size);
+ }
+ // Check that the payload content matches the expected sequence.
+ for (size_t i = 0; i < payload_str.size; i++) {
+ char exp = static_cast<char>(33 + ((te.seq_value() + i) % 64));
+ if (payload_str.data[i] != exp) {
+ return AddFailure(
+ "Payload mismatch at %zu, depth %u, event id %u, sequence %u. "
+ "Expected: 0x%x, Actual: 0x%x",
+ i, depth, te.seq_value(), seq_id, exp, payload_str.data[i]);
+ }
+ }
+ }
+ if (num_payload_pieces != 2) {
+ return AddFailure(
+ "Broken payload at depth %u, event id %u, sequence %u. "
+ "Expecting 2 repeated str fields, got %d",
+ depth, te.seq_value(), seq_id, num_payload_pieces);
+ }
+
+ if (depth > 0 && rem_depth != last_depth - 1) {
+ return AddFailure(
+ "Unexpected nesting level (expected: %u, actual: %u) at depth %u, "
+ "event id %u, sequence %u",
+ rem_depth, last_depth - 1, depth, te.seq_value(), seq_id);
+ }
+
+ last_depth = rem_depth;
+ if (rem_depth == 0)
+ break;
+ if (payload.has_nested()) {
+ payload_bounds = *payload.nested();
+ } else {
+ payload_bounds = {nullptr, 0};
+ }
+ }
+}
+
+void CtrlCHandler(int) {
+ g_sig->aborted.store(true);
+ for (auto it = g_sig->pids_to_kill.rbegin(); it != g_sig->pids_to_kill.rend();
+ it++) {
+ kill(*it, SIGKILL);
+ }
+}
+
+void StressTestMain(int argc, char** argv) {
+ TestHarness th;
+ std::regex filter;
+ bool has_filter = false;
+
+ bool verbose = false;
+ for (int i = 1; i < argc; ++i) {
+ if (!strcmp(argv[i], "-v")) {
+ verbose = true;
+ } else {
+ filter = std::regex(argv[i], std::regex::ECMAScript | std::regex::icase);
+ has_filter = true;
+ }
+ }
+
+ g_sig = new SigHandlerCtx();
+ signal(SIGINT, CtrlCHandler);
+
+ for (size_t i = 0; i < base::ArraySize(kStressTestConfigs) && !g_sig->aborted;
+ ++i) {
+ const auto& cfg_blob = kStressTestConfigs[i];
+ StressTestConfig cfg;
+ std::cmatch ignored;
+ if (has_filter && !std::regex_search(cfg_blob.name, ignored, filter)) {
+ continue;
+ }
+ PERFETTO_CHECK(cfg.ParseFromArray(cfg_blob.data, cfg_blob.size));
+ th.RunConfig(cfg_blob.name, cfg, verbose);
+ }
+
+ for (const auto& tres : th.test_results()) {
+ const auto& cfg = tres.cfg;
+ printf("===============================================================\n");
+ printf("Config: %s\n", tres.cfg_name);
+ printf("===============================================================\n");
+ printf("%-20s %-10s %-10s\n", "Metric", "Expected", "Actual");
+ printf("%-20s %-10s %-10s\n", "------", "--------", "------");
+ printf("%-20s %-10d %-10d\n", "#Errors", 0, tres.num_errors);
+ printf("%-20s %-10d %-10d \n", "Duration [ms]",
+ cfg.trace_config().duration_ms(), tres.run_time_ms);
+
+ uint32_t exp_threads = cfg.num_processes() * cfg.num_threads();
+ printf("%-20s %-10u %-10u\n", "Num threads", exp_threads, tres.num_threads);
+
+ double dur_s = cfg.trace_config().duration_ms() / 1e3;
+ double exp_per_thread = cfg.steady_state_timings().rate_mean() * dur_s;
+ if (cfg.burst_period_ms()) {
+ double burst_rate = 1.0 * cfg.burst_duration_ms() / cfg.burst_period_ms();
+ exp_per_thread *= 1.0 - burst_rate;
+ exp_per_thread += burst_rate * cfg.burst_timings().rate_mean() * dur_s;
+ }
+ if (cfg.max_events())
+ exp_per_thread = std::min(exp_per_thread, 1.0 * cfg.max_events());
+ double exp_packets = std::round(exp_per_thread * exp_threads);
+ printf("%-20s %-10.0f %-10d\n", "Num packets", exp_packets,
+ tres.num_packets);
+
+ double exp_size_kb = exp_packets * (cfg.nesting() + 1) *
+ (cfg.steady_state_timings().payload_mean() + 40) /
+ 1000;
+ printf("%-20s ~%-9.0f %-10d\n", "Trace size [KB]", exp_size_kb,
+ tres.trace_size_kb);
+
+ double exp_rss_mb = cfg.trace_config().buffers()[0].size_kb() / 1000;
+ printf("%-20s (max) %-4.0f %-10d\n", "Svc RSS [MB]", exp_rss_mb,
+ tres.svc_rusage.max_rss_kb / 1000);
+ printf("%-20s %-10s %-10d\n", "Svc CPU [ms]", "---",
+ tres.svc_rusage.cpu_time_ms());
+ printf("%-20s %-10s %d / %d\n", "Svc #ctxswitch", "---",
+ tres.svc_rusage.invol_ctx_switch, tres.svc_rusage.vol_ctx_switch);
+
+ printf("%-20s %-10s %-10d\n", "Prod RSS [MB]", "---",
+ tres.prod_rusage.max_rss_kb / 1000);
+ printf("%-20s %-10s %-10d\n", "Prod CPU [ms]", "---",
+ tres.prod_rusage.cpu_time_ms());
+ printf("%-20s %-10s %d / %d\n", "Prod #ctxswitch", "---",
+ tres.prod_rusage.invol_ctx_switch, tres.prod_rusage.vol_ctx_switch);
+ printf("\n");
+ }
+}
+
+} // namespace
+} // namespace perfetto
+
+int main(int argc, char** argv) {
+ perfetto::StressTestMain(argc, argv);
+}