Merge "trace_processor: optimize filtering for sched slice and generify"
diff --git a/.travis.yml b/.travis.yml
index fcb7192..7e5ed01 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,11 +66,13 @@
sudo: false
compiler: clang
env: CFG=linux_trusty-clang-x86_64-msan GN_ARGS="is_debug=false is_msan=true"
+# TODO(b/117093687): ubsan always times out in Travis.
+# Re-enable once that is fixed.
- os: linux
dist: trusty
sudo: true
compiler: clang
- env: CFG=linux_trusty-clang-x86_64-ubsan_asan_lsan GN_ARGS="is_debug=false is_ubsan=true is_asan=true is_lsan=true"
+ env: CFG=linux_trusty-clang-x86_64-asan_lsan GN_ARGS="is_debug=false is_asan=true is_lsan=true"
- os: linux
dist: trusty
sudo: false
diff --git a/Android.bp b/Android.bp
index e7f0d6c..541e66f 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3850,6 +3850,8 @@
"src/profiling/memory/heapprofd_integrationtest.cc",
"src/profiling/memory/record_reader.cc",
"src/profiling/memory/record_reader_unittest.cc",
+ "src/profiling/memory/sampler.cc",
+ "src/profiling/memory/sampler_unittest.cc",
"src/profiling/memory/socket_listener.cc",
"src/profiling/memory/socket_listener_unittest.cc",
"src/profiling/memory/string_interner.cc",
@@ -3857,6 +3859,7 @@
"src/profiling/memory/unwinding.cc",
"src/profiling/memory/unwinding_unittest.cc",
"src/profiling/memory/wire_protocol.cc",
+ "src/profiling/memory/wire_protocol_unittest.cc",
"src/protozero/message.cc",
"src/protozero/message_handle.cc",
"src/protozero/message_handle_unittest.cc",
diff --git a/buildtools/.gitignore b/buildtools/.gitignore
index 1dd47b9..f33608c 100644
--- a/buildtools/.gitignore
+++ b/buildtools/.gitignore
@@ -1,3 +1,4 @@
+android-core/
android_sdk/
aosp-*/
benchmark/
@@ -10,15 +11,15 @@
libbacktrace/
libcxx/
libcxxabi/
+libfuzzer/
libunwind/
linenoise/
linux/
linux64/
+lzma/
mac/
ndk/
nodejs/
protobuf/
sqlite/
-android-core/
-lzma/
test_data/
diff --git a/gn/standalone/wasm.gni b/gn/standalone/wasm.gni
index 055af6a..fa6f58b 100644
--- a/gn/standalone/wasm.gni
+++ b/gn/standalone/wasm.gni
@@ -54,7 +54,9 @@
"-s",
"NO_DYNAMIC_EXECUTION=1",
"-s",
- "TOTAL_MEMORY=536870912",
+ "TOTAL_MEMORY=33554432",
+ "-s",
+ "ALLOW_MEMORY_GROWTH=1",
"-s",
"RESERVED_FUNCTION_POINTERS=32",
"-s",
diff --git a/include/perfetto/base/unix_socket.h b/include/perfetto/base/unix_socket.h
index 7daf5a2..4a613a0 100644
--- a/include/perfetto/base/unix_socket.h
+++ b/include/perfetto/base/unix_socket.h
@@ -54,6 +54,16 @@
base::ScopedFile CreateSocket();
+// Update msghdr so subsequent sendmsg will send data that remains after n bytes
+// have already been sent.
+// This should not be used, it's exported for test use only.
+void ShiftMsgHdr(size_t n, struct msghdr* msg);
+
+// Re-enter sendmsg until all the data has been sent or an error occurs.
+//
+// TODO(fmayer): Figure out how to do timeouts here for heapprofd.
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags);
+
// A non-blocking UNIX domain socket in SOCK_STREAM mode. Allows also to
// transfer file descriptors. None of the methods in this class are blocking.
// The main design goal is API simplicity and strong guarantees on the
@@ -170,6 +180,8 @@
// EventListener::OnDisconnect() will be called.
// If the socket is not connected, Send() will just return false.
// Does not append a null string terminator to msg in any case.
+ //
+ // DO NOT PASS kNonBlocking, it is broken.
bool Send(const void* msg,
size_t len,
int send_fd = -1,
@@ -179,7 +191,8 @@
const int* send_fds,
size_t num_fds,
BlockingMode blocking = BlockingMode::kNonBlocking);
- bool Send(const std::string& msg);
+ bool Send(const std::string& msg,
+ BlockingMode blockimg = BlockingMode::kNonBlocking);
// Returns the number of bytes (<= |len|) written in |msg| or 0 if there
// is no data in the buffer to read or an error occurs (in which case a
diff --git a/include/perfetto/traced/sys_stats_counters.h b/include/perfetto/traced/sys_stats_counters.h
index 5bb0974..98d25af 100644
--- a/include/perfetto/traced/sys_stats_counters.h
+++ b/include/perfetto/traced/sys_stats_counters.h
@@ -30,6 +30,7 @@
};
constexpr KeyAndId kMeminfoKeys[] = {
+ {"MemUnspecified", protos::pbzero::MeminfoCounters::MEMINFO_UNSPECIFIED},
{"MemTotal", protos::pbzero::MeminfoCounters::MEMINFO_MEM_TOTAL},
{"MemFree", protos::pbzero::MeminfoCounters::MEMINFO_MEM_FREE},
{"MemAvailable", protos::pbzero::MeminfoCounters::MEMINFO_MEM_AVAILABLE},
@@ -66,6 +67,7 @@
};
const KeyAndId kVmstatKeys[] = {
+ {"VmstatUnspecified", protos::pbzero::VmstatCounters::VMSTAT_UNSPECIFIED},
{"nr_free_pages", protos::pbzero::VmstatCounters::VMSTAT_NR_FREE_PAGES},
{"nr_alloc_batch", protos::pbzero::VmstatCounters::VMSTAT_NR_ALLOC_BATCH},
{"nr_inactive_anon",
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 3f8698f..2c8e6f2 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -60,24 +60,40 @@
// instance.
virtual void OnDisconnect() = 0;
- // Called by the Service to turn on one of the data source previously
+ // Called by the Service after OnConnect but before the first DataSource is
+ // created. Can be used for any setup required before tracing begins.
+ virtual void OnTracingSetup() = 0;
+
+ // The lifecycle methods below are always called in the following sequence:
+ // SetupDataSource -> StartDataSource -> StopDataSource.
+ // Or, in the edge case where a trace is aborted immediately:
+ // SetupDataSource -> StopDataSource.
+ // The Setup+Start call sequence is always guaranateed, regardless of the
+ // TraceConfig.deferred_start flags.
+ // Called by the Service to configure one of the data sources previously
// registered through TracingService::ProducerEndpoint::RegisterDataSource().
+ // This method is always called before StartDataSource. There is always a
+ // SetupDataSource() call before each StartDataSource() call.
// Args:
// - DataSourceInstanceID is an identifier chosen by the Service that should
// be assigned to the newly created data source instance. It is used to
// match the StopDataSource() request below.
// - DataSourceConfig is the configuration for the new data source (e.g.,
// tells which trace categories to enable).
+ virtual void SetupDataSource(DataSourceInstanceID,
+ const DataSourceConfig&) = 0;
+
+ // Called by the Service to turn on one of the data sources previously
+ // registered through TracingService::ProducerEndpoint::RegisterDataSource()
+ // and initialized through SetupDataSource().
+ // Both arguments are guaranteed to be identical to the ones passed to the
+ // prior SetupDataSource() call.
virtual void StartDataSource(DataSourceInstanceID,
const DataSourceConfig&) = 0;
// Called by the Service to shut down an existing data source instance.
virtual void StopDataSource(DataSourceInstanceID) = 0;
- // Called by the Service after OnConnect but before the first DataSource is
- // created. Can be used for any setup required before tracing begins.
- virtual void OnTracingSetup() = 0;
-
// Called by the service to request the Producer to commit the data of the
// given data sources and return their chunks into the shared memory buffer.
// The Producer is expected to invoke NotifyFlushComplete(FlushRequestID) on
diff --git a/include/perfetto/tracing/core/trace_config.h b/include/perfetto/tracing/core/trace_config.h
index df0a89e..f7ac92f 100644
--- a/include/perfetto/tracing/core/trace_config.h
+++ b/include/perfetto/tracing/core/trace_config.h
@@ -301,6 +301,9 @@
return &guardrail_overrides_;
}
+ bool deferred_start() const { return deferred_start_; }
+ void set_deferred_start(bool value) { deferred_start_ = value; }
+
private:
std::vector<BufferConfig> buffers_;
std::vector<DataSource> data_sources_;
@@ -313,6 +316,7 @@
uint32_t file_write_period_ms_ = {};
uint64_t max_file_size_bytes_ = {};
GuardrailOverrides guardrail_overrides_ = {};
+ bool deferred_start_ = {};
// Allows to preserve unknown protobuf fields for compatibility
// with future versions of .proto files.
diff --git a/include/perfetto/tracing/core/tracing_service.h b/include/perfetto/tracing/core/tracing_service.h
index 5759368..cdeb575 100644
--- a/include/perfetto/tracing/core/tracing_service.h
+++ b/include/perfetto/tracing/core/tracing_service.h
@@ -120,8 +120,20 @@
// Enables tracing with the given TraceConfig. The ScopedFile argument is
// used only when TraceConfig.write_into_file == true.
+ // If TraceConfig.deferred_start == true data sources are configured via
+ // SetupDataSource() but are not started until StartTracing() is called.
+ // This is to support pre-initialization and fast triggering of traces.
+ // The ScopedFile argument is used only when TraceConfig.write_into_file
+ // == true.
virtual void EnableTracing(const TraceConfig&,
base::ScopedFile = base::ScopedFile()) = 0;
+
+ // Starts all data sources configured in the trace config. This is used only
+ // after calling EnableTracing() with TraceConfig.deferred_start=true.
+ // It's a no-op if called after a regular EnableTracing(), without setting
+ // deferred_start.
+ virtual void StartTracing() = 0;
+
virtual void DisableTracing() = 0;
// Requests all data sources to flush their data immediately and invokes the
diff --git a/infra/perfetto-ci.appspot.com/static/index.html b/infra/perfetto-ci.appspot.com/static/index.html
index 9f62f40..f9e9eee 100644
--- a/infra/perfetto-ci.appspot.com/static/index.html
+++ b/infra/perfetto-ci.appspot.com/static/index.html
@@ -81,7 +81,7 @@
<td id="linux_trusty-clang-x86_64-debug">dbg</td>
<td id="linux_trusty-clang-x86_64-tsan">tsan</td>
<td id="linux_trusty-clang-x86_64-msan">msan</td>
- <td id="linux_trusty-clang-x86_64-ubsan_asan_lsan">{a,l,ub}san</td>
+ <td id="linux_trusty-clang-x86_64-asan_lsan">{a,l}san</td>
<td id="linux_trusty-clang-x86_64-libfuzzer">fuzzer</td>
<td id="ui-clang-x86_64-debug">dbg</td>
<td id="ui-clang-x86_64-release">rel</td>
diff --git a/protos/perfetto/config/perfetto_config.proto b/protos/perfetto/config/perfetto_config.proto
index 91f4aad..595011b 100644
--- a/protos/perfetto/config/perfetto_config.proto
+++ b/protos/perfetto/config/perfetto_config.proto
@@ -381,7 +381,7 @@
// It contains the general config for the logging buffer(s) and the configs for
// all the data source being enabled.
//
-// Next id: 12.
+// Next id: 13.
message TraceConfig {
message BufferConfig {
optional uint32 size_kb = 1;
@@ -487,6 +487,14 @@
}
optional GuardrailOverrides guardrail_overrides = 11;
+
+ // When true, data sources are not started until an explicit call to
+ // StartTracing() on the consumer port. This is to support early
+ // initialization and fast trace triggering. This can be used only when the
+ // Consumer explicitly triggers the StartTracing() method.
+ // This should not be used in a remote trace config via statsd, doing so will
+ // result in a hung trace session.
+ optional bool deferred_start = 12;
}
// End of protos/perfetto/config/trace_config.proto
diff --git a/protos/perfetto/config/trace_config.proto b/protos/perfetto/config/trace_config.proto
index 11ce8d2..08844f2 100644
--- a/protos/perfetto/config/trace_config.proto
+++ b/protos/perfetto/config/trace_config.proto
@@ -29,7 +29,7 @@
// It contains the general config for the logging buffer(s) and the configs for
// all the data source being enabled.
//
-// Next id: 12.
+// Next id: 13.
message TraceConfig {
message BufferConfig {
optional uint32 size_kb = 1;
@@ -135,4 +135,12 @@
}
optional GuardrailOverrides guardrail_overrides = 11;
+
+ // When true, data sources are not started until an explicit call to
+ // StartTracing() on the consumer port. This is to support early
+ // initialization and fast trace triggering. This can be used only when the
+ // Consumer explicitly triggers the StartTracing() method.
+ // This should not be used in a remote trace config via statsd, doing so will
+ // result in a hung trace session.
+ optional bool deferred_start = 12;
}
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index 8c3e9b8..a8d5a84 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -33,8 +33,20 @@
// Enables tracing for one or more data sources. At least one buffer must have
// been previously created. The EnableTracingResponse is sent when tracing is
// disabled (either explicitly or because of the |duration_ms| expired).
+ // The EnableTracingResponse is sent when tracing is disabled (either
+ // explicitly or because of the |duration_ms| expired).
+ // If |deferred_start| == true in the passed TraceConfig, all the tracing
+ // harness is brought up (creating buffers and data sources) without actually
+ // starting the data sources. Data sources will be started upon an explicit
+ // StartTracing() call.
+ // Note that |deferred_start| and StartTracing() have been introduced only
+ // in Android Q and are not supported in Android P.
rpc EnableTracing(EnableTracingRequest) returns (EnableTracingResponse) {}
+ // Starts tracing. Only valid if EnableTracing() was called setting
+ // deferred_start = true in the TraceConfig passed to EnableTracing().
+ rpc StartTracing(StartTracingRequest) returns (StartTracingResponse) {}
+
// Disables tracing for one or more data sources.
rpc DisableTracing(DisableTracingRequest) returns (DisableTracingResponse) {}
@@ -70,6 +82,11 @@
oneof state { bool disabled = 1; }
}
+// Arguments for rpc StartTracing().
+message StartTracingRequest {}
+
+message StartTracingResponse {}
+
// Arguments for rpc DisableTracing().
message DisableTracingRequest {
// TODO: not supported yet, selectively disable only some data sources.
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index b743914..0a27f82 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -118,8 +118,18 @@
message GetAsyncCommandRequest {}
message GetAsyncCommandResponse {
+ // Called after SetupTracing and before StartDataSource.
+ // This message was introduced in Android Q.
+ message SetupDataSource {
+ optional uint64 new_instance_id = 1;
+ optional protos.DataSourceConfig config = 2;
+ }
+
message StartDataSource {
optional uint64 new_instance_id = 1;
+
+ // For backwards compat reasons (with Android P), the config passed here
+ // is identical to the one passed to SetupDataSource.config.
optional protos.DataSourceConfig config = 2;
}
@@ -140,10 +150,12 @@
optional uint64 request_id = 2;
}
+ // Next id: 7.
oneof cmd {
+ SetupTracing setup_tracing = 3;
+ SetupDataSource setup_data_source = 6;
StartDataSource start_data_source = 1;
StopDataSource stop_data_source = 2;
- SetupTracing setup_tracing = 3;
// id == 4 was teardown_tracing, never implemented.
Flush flush = 5;
}
diff --git a/src/base/unix_socket.cc b/src/base/unix_socket.cc
index e3ed5f2..fb4f5ff 100644
--- a/src/base/unix_socket.cc
+++ b/src/base/unix_socket.cc
@@ -64,6 +64,55 @@
#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
+void ShiftMsgHdr(size_t n, struct msghdr* msg) {
+ for (size_t i = 0; i < msg->msg_iovlen; ++i) {
+ struct iovec* vec = &msg->msg_iov[i];
+ if (n < vec->iov_len) {
+ // We sent a part of this iovec.
+ vec->iov_base = reinterpret_cast<char*>(vec->iov_base) + n;
+ vec->iov_len -= n;
+ msg->msg_iov = vec;
+ msg->msg_iovlen -= i;
+ return;
+ }
+ // We sent the whole iovec.
+ n -= vec->iov_len;
+ }
+ // We sent all the iovecs.
+ PERFETTO_CHECK(n == 0);
+ msg->msg_iovlen = 0;
+ msg->msg_iov = nullptr;
+}
+
+// For the interested reader, Linux kernel dive to verify this is not only a
+// theoretical possibility: sock_stream_sendmsg, if sock_alloc_send_pskb returns
+// NULL [1] (which it does when it gets interrupted [2]), returns early with the
+// amount of bytes already sent.
+//
+// [1]:
+// https://elixir.bootlin.com/linux/v4.18.10/source/net/unix/af_unix.c#L1872
+// [2]: https://elixir.bootlin.com/linux/v4.18.10/source/net/core/sock.c#L2101
+ssize_t SendMsgAll(int sockfd, struct msghdr* msg, int flags) {
+ // This does not make sense on non-blocking sockets.
+ PERFETTO_DCHECK((fcntl(sockfd, F_GETFL, 0) & O_NONBLOCK) == 0);
+
+ ssize_t total_sent = 0;
+ while (msg->msg_iov) {
+ ssize_t sent = PERFETTO_EINTR(sendmsg(sockfd, msg, flags));
+ if (sent <= 0) {
+ if (sent == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ return total_sent;
+ return sent;
+ }
+ total_sent += sent;
+ ShiftMsgHdr(static_cast<size_t>(sent), msg);
+ // Only send the ancillary data with the first sendmsg call.
+ msg->msg_control = nullptr;
+ msg->msg_controllen = 0;
+ };
+ return total_sent;
+};
+
ssize_t SockSend(int fd,
const void* msg,
size_t len,
@@ -90,7 +139,7 @@
msg_hdr.msg_controllen = cmsg->cmsg_len;
}
- return PERFETTO_EINTR(sendmsg(fd, &msg_hdr, kNoSigPipe));
+ return SendMsgAll(fd, &msg_hdr, kNoSigPipe);
}
ssize_t SockReceive(int fd,
@@ -396,8 +445,8 @@
}
}
-bool UnixSocket::Send(const std::string& msg) {
- return Send(msg.c_str(), msg.size() + 1);
+bool UnixSocket::Send(const std::string& msg, BlockingMode blocking) {
+ return Send(msg.c_str(), msg.size() + 1, -1, blocking);
}
bool UnixSocket::Send(const void* msg,
@@ -414,6 +463,10 @@
const int* send_fds,
size_t num_fds,
BlockingMode blocking_mode) {
+ // TODO(b/117139237): Non-blocking sends are broken because we do not
+ // properly handle partial sends.
+ PERFETTO_DCHECK(blocking_mode == BlockingMode::kBlocking);
+
if (state_ != State::kConnected) {
errno = last_error_ = ENOTCONN;
return false;
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 5cdd6ab..382f9bc 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -41,6 +41,7 @@
using ::testing::Mock;
constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
+constexpr auto kBlocking = UnixSocket::BlockingMode::kBlocking;
class MockEventListener : public UnixSocket::EventListener {
public:
@@ -115,7 +116,7 @@
auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
.WillOnce(InvokeWithoutArgs(cli_disconnected));
- EXPECT_FALSE(cli->Send("whatever"));
+ EXPECT_FALSE(cli->Send("whatever", kBlocking));
task_runner_.RunUntilCheckpoint("cli_disconnected");
}
@@ -153,8 +154,8 @@
ASSERT_EQ("cli>srv", s->ReceiveString());
srv_did_recv();
}));
- ASSERT_TRUE(cli->Send("cli>srv"));
- ASSERT_TRUE(srv_conn->Send("srv>cli"));
+ ASSERT_TRUE(cli->Send("cli>srv", kBlocking));
+ ASSERT_TRUE(srv_conn->Send("srv>cli", kBlocking));
task_runner_.RunUntilCheckpoint("cli_did_recv");
task_runner_.RunUntilCheckpoint("srv_did_recv");
@@ -168,8 +169,8 @@
ASSERT_EQ("", cli->ReceiveString());
ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
ASSERT_EQ("", srv_conn->ReceiveString());
- ASSERT_FALSE(cli->Send("foo"));
- ASSERT_FALSE(srv_conn->Send("bar"));
+ ASSERT_FALSE(cli->Send("foo", kBlocking));
+ ASSERT_FALSE(srv_conn->Send("bar", kBlocking));
srv->Shutdown(true);
task_runner_.RunUntilCheckpoint("cli_disconnected");
task_runner_.RunUntilCheckpoint("srv_disconnected");
@@ -244,10 +245,10 @@
int buf_fd[2] = {null_fd.get(), zero_fd.get()};
- ASSERT_TRUE(
- cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
+ ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
+ base::ArraySize(buf_fd), kBlocking));
ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
- base::ArraySize(buf_fd)));
+ base::ArraySize(buf_fd), kBlocking));
task_runner_.RunUntilCheckpoint("srv_did_recv");
task_runner_.RunUntilCheckpoint("cli_did_recv");
@@ -300,7 +301,7 @@
EXPECT_CALL(event_listener_, OnDataAvailable(s))
.WillOnce(Invoke([](UnixSocket* t) {
ASSERT_EQ("PING", t->ReceiveString());
- ASSERT_TRUE(t->Send("PONG"));
+ ASSERT_TRUE(t->Send("PONG", kBlocking));
}));
}));
@@ -309,7 +310,7 @@
EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
.WillOnce(Invoke([](UnixSocket* s, bool success) {
ASSERT_TRUE(success);
- ASSERT_TRUE(s->Send("PING"));
+ ASSERT_TRUE(s->Send("PING", kBlocking));
}));
auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
@@ -356,7 +357,7 @@
.WillOnce(Invoke(
[this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
- ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
+ ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd, kBlocking));
// Wait for the client to change this again.
EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
.WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
@@ -391,7 +392,7 @@
// Now change the shared memory and ping the other process.
memcpy(mem, "rock more", 10);
- ASSERT_TRUE(s->Send("change notify"));
+ ASSERT_TRUE(s->Send("change notify", kBlocking));
checkpoint();
}));
task_runner_.RunUntilCheckpoint("change_seen_by_client");
@@ -409,7 +410,7 @@
int num_frame) {
char buf[kAtomicWrites_FrameSize];
memset(buf, static_cast<char>(num_frame), sizeof(buf));
- if (s->Send(buf, sizeof(buf)))
+ if (s->Send(buf, sizeof(buf), -1, kBlocking))
return true;
task_runner->PostTask(
std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
@@ -560,8 +561,7 @@
char buf[1024 * 32] = {};
tx_task_runner.PostTask([&cli, &buf, all_sent] {
for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
- cli->Send(buf, sizeof(buf), -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ cli->Send(buf, sizeof(buf), -1 /*fd*/, kBlocking);
all_sent();
});
tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
@@ -608,8 +608,7 @@
static constexpr size_t kBufSize = 32 * 1024 * 1024;
std::unique_ptr<char[]> buf(new char[kBufSize]());
tx_task_runner.PostTask([&cli, &buf, send_done] {
- bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
- UnixSocket::BlockingMode::kBlocking);
+ bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/, kBlocking);
ASSERT_FALSE(send_res);
send_done();
});
@@ -620,6 +619,166 @@
tx_thread.join();
}
+TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
+ // Send a part of the first iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
+ EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
+ EXPECT_EQ(hdr.msg_iovlen, 2);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
+ EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
+
+ ShiftMsgHdr(base::ArraySize(hello) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, &iov[1]);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
+
+ ShiftMsgHdr(base::ArraySize(world), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
+ // Send first iov and part of the second iov, then send the rest.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(hello) + 1, &hdr);
+ EXPECT_NE(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 1);
+ EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
+ EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
+
+ ShiftMsgHdr(base::ArraySize(world) - 1, &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
+ // Send everything at once.
+ struct iovec iov[2] = {};
+ char hello[] = "hello";
+ char world[] = "world";
+ iov[0].iov_base = &hello[0];
+ iov[0].iov_len = base::ArraySize(hello);
+
+ iov[1].iov_base = &world[0];
+ iov[1].iov_len = base::ArraySize(world);
+
+ struct msghdr hdr = {};
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ShiftMsgHdr(base::ArraySize(world) + base::ArraySize(hello), &hdr);
+ EXPECT_EQ(hdr.msg_iov, nullptr);
+ EXPECT_EQ(hdr.msg_iovlen, 0);
+}
+
+void Handler(int) {}
+
+int RollbackSigaction(const struct sigaction* act) {
+ return sigaction(SIGWINCH, act, nullptr);
+}
+
+TEST_F(UnixSocketTest, PartialSendMsgAll) {
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_socket(sv[0]);
+ base::ScopedFile recv_socket(sv[1]);
+
+ // Set bufsize to minimum.
+ int bufsize = 1024;
+ ASSERT_EQ(setsockopt(*send_socket, SOL_SOCKET, SO_SNDBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+ ASSERT_EQ(setsockopt(*recv_socket, SOL_SOCKET, SO_RCVBUF, &bufsize,
+ sizeof(bufsize)),
+ 0);
+
+ // Send something larger than send + recv kernel buffers combined to make
+ // sendmsg block.
+ char send_buf[8192];
+ // Make MSAN happy.
+ for (size_t i = 0; i < sizeof(send_buf); ++i)
+ send_buf[i] = static_cast<char>(i % 256);
+ char recv_buf[sizeof(send_buf)];
+
+ // Need to install signal handler to cause the interrupt to happen.
+ // man 3 pthread_kill:
+ // Signal dispositions are process-wide: if a signal handler is
+ // installed, the handler will be invoked in the thread thread, but if
+ // the disposition of the signal is "stop", "continue", or "terminate",
+ // this action will affect the whole process.
+ struct sigaction oldact;
+ struct sigaction newact = {};
+ newact.sa_handler = Handler;
+ ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
+ base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
+ rollback(&oldact);
+
+ auto blocked_thread = pthread_self();
+ std::thread th([blocked_thread, &recv_socket, &recv_buf] {
+ ssize_t rd = PERFETTO_EINTR(read(*recv_socket, recv_buf, 1));
+ ASSERT_EQ(rd, 1);
+ // We are now sure the other thread is in sendmsg, interrupt send.
+ ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
+ // Drain the socket to allow SendMsgAll to succeed.
+ size_t offset = 1;
+ while (offset < sizeof(recv_buf)) {
+ rd = PERFETTO_EINTR(
+ read(*recv_socket, recv_buf + offset, sizeof(recv_buf) - offset));
+ ASSERT_GE(rd, 0);
+ offset += static_cast<size_t>(rd);
+ }
+ });
+
+ // Test sending the send_buf in several chunks as an iov to exercise the
+ // more complicated code-paths of SendMsgAll.
+ struct msghdr hdr = {};
+ struct iovec iov[4];
+ static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
+ "Cannot split buffer into even pieces.");
+ constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
+ for (size_t i = 0; i < base::ArraySize(iov); ++i) {
+ iov[i].iov_base = send_buf + i * kChunkSize;
+ iov[i].iov_len = kChunkSize;
+ }
+ hdr.msg_iov = iov;
+ hdr.msg_iovlen = base::ArraySize(iov);
+
+ ASSERT_EQ(SendMsgAll(*send_socket, &hdr, 0), sizeof(send_buf));
+ send_socket.reset();
+ th.join();
+ // Make sure the re-entry logic was actually triggered.
+ ASSERT_EQ(hdr.msg_iov, nullptr);
+ ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
+}
+
// TODO(primiano): add a test to check that in the case of a peer sending a fd
// and the other end just doing a recv (without taking it), the fd is closed and
// not left around.
diff --git a/src/ipc/client_impl_unittest.cc b/src/ipc/client_impl_unittest.cc
index 445e3d0..050fcca 100644
--- a/src/ipc/client_impl_unittest.cc
+++ b/src/ipc/client_impl_unittest.cc
@@ -182,7 +182,8 @@
void Reply(const Frame& frame) {
auto buf = BufferedFrameDeserializer::Serialize(frame);
ASSERT_TRUE(client_sock->is_connected());
- EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd));
+ EXPECT_TRUE(client_sock->Send(buf.data(), buf.size(), next_reply_fd,
+ base::UnixSocket::BlockingMode::kBlocking));
next_reply_fd = -1;
}
diff --git a/src/ipc/host_impl_unittest.cc b/src/ipc/host_impl_unittest.cc
index 3ff1f99..26b56c8 100644
--- a/src/ipc/host_impl_unittest.cc
+++ b/src/ipc/host_impl_unittest.cc
@@ -152,7 +152,8 @@
void SendFrame(const Frame& frame, int fd = -1) {
std::string buf = BufferedFrameDeserializer::Serialize(frame);
- ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd));
+ ASSERT_TRUE(sock_->Send(buf.data(), buf.size(), fd,
+ base::UnixSocket::BlockingMode::kBlocking));
}
BufferedFrameDeserializer frame_deserializer_;
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index 78e57c9..369cb71 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -62,6 +62,8 @@
sources = [
"client.cc",
"client.h",
+ "sampler.cc",
+ "sampler.h",
]
}
@@ -83,9 +85,11 @@
"client_unittest.cc",
"heapprofd_integrationtest.cc",
"record_reader_unittest.cc",
+ "sampler_unittest.cc",
"socket_listener_unittest.cc",
"string_interner_unittest.cc",
"unwinding_unittest.cc",
+ "wire_protocol_unittest.cc",
]
}
diff --git a/src/profiling/memory/sampler.cc b/src/profiling/memory/sampler.cc
new file mode 100644
index 0000000..eba955e
--- /dev/null
+++ b/src/profiling/memory/sampler.cc
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace {
+ThreadLocalSamplingData* GetSpecific(pthread_key_t key,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*)) {
+ // This should not be used with glibc as it might re-enter into malloc, see
+ // http://crbug.com/776475.
+ void* specific = pthread_getspecific(key);
+ if (specific == nullptr) {
+ specific = unhooked_malloc(sizeof(ThreadLocalSamplingData));
+ new (specific) ThreadLocalSamplingData(unhooked_free);
+ pthread_setspecific(key, specific);
+ }
+ return reinterpret_cast<ThreadLocalSamplingData*>(specific);
+}
+} // namespace
+
+// The algorithm below is a inspired by the Chromium sampling algorithm at
+// https://cs.chromium.org/search/?q=f:cc+symbol:AllocatorShimLogAlloc+package:%5Echromium$&type=cs
+
+int64_t ThreadLocalSamplingData::NextSampleInterval(double rate) {
+ std::exponential_distribution<double> dist(1 / rate);
+ int64_t next = static_cast<int64_t>(dist(random_engine_));
+ return next < 1 ? 1 : next;
+}
+
+size_t ThreadLocalSamplingData::ShouldSample(size_t sz, double rate) {
+ interval_to_next_sample_ -= sz;
+ size_t sz_multiplier = 0;
+ while (PERFETTO_UNLIKELY(interval_to_next_sample_ <= 0)) {
+ interval_to_next_sample_ += NextSampleInterval(rate);
+ ++sz_multiplier;
+ }
+ return sz_multiplier;
+}
+
+size_t ShouldSample(pthread_key_t key,
+ size_t sz,
+ double rate,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*)) {
+ if (PERFETTO_UNLIKELY(sz >= rate))
+ return 1;
+ return GetSpecific(key, unhooked_malloc, unhooked_free)
+ ->ShouldSample(sz, rate);
+}
+
+void ThreadLocalSamplingData::KeyDestructor(void* ptr) {
+ ThreadLocalSamplingData* thread_local_data =
+ reinterpret_cast<ThreadLocalSamplingData*>(ptr);
+ void (*unhooked_free)(void*) = thread_local_data->unhooked_free_;
+ thread_local_data->~ThreadLocalSamplingData();
+ unhooked_free(ptr);
+}
+
+} // namespace perfetto
diff --git a/src/profiling/memory/sampler.h b/src/profiling/memory/sampler.h
new file mode 100644
index 0000000..879e2a2
--- /dev/null
+++ b/src/profiling/memory/sampler.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_MEMORY_SAMPLER_H_
+#define SRC_PROFILING_MEMORY_SAMPLER_H_
+
+#include <pthread.h>
+#include <stdint.h>
+
+#include <random>
+
+namespace perfetto {
+
+// This is the thread-local state needed to apply poission sampling to malloc
+// samples.
+//
+// We apply poisson sampling individually to each byte. The whole
+// allocation gets accounted as often as the number of sampled bytes it
+// contains.
+//
+// Googlers see go/chrome-shp for more details about the sampling (from
+// Chrome's heap profiler).
+class ThreadLocalSamplingData {
+ public:
+ ThreadLocalSamplingData(void (*unhooked_free)(void*))
+ : unhooked_free_(unhooked_free) {}
+ // Returns number of times a sample should be accounted. Due to how the
+ // poission sampling works, some samples should be accounted multiple times.
+ size_t ShouldSample(size_t sz, double rate);
+
+ // Destroy a TheadLocalSamplingData object after the pthread key has been
+ // deleted or when the thread shuts down. This uses unhooked_free passed in
+ // the constructor.
+ static void KeyDestructor(void* ptr);
+
+ private:
+ int64_t NextSampleInterval(double rate);
+ void (*unhooked_free_)(void*);
+ int64_t interval_to_next_sample_ = 0;
+ std::default_random_engine random_engine_;
+};
+
+// Returns number of times a sample should be accounted. Due to how the
+// poission sampling works, some samples should be accounted multiple times.
+//
+// Delegate to this thread's ThreadLocalSamplingData.
+//
+// We have to pass through the real malloc in order to allocate the TLS.
+size_t ShouldSample(pthread_key_t key,
+ size_t sz,
+ double rate,
+ void* (*unhooked_malloc)(size_t),
+ void (*unhooked_free)(void*));
+
+} // namespace perfetto
+
+#endif // SRC_PROFILING_MEMORY_SAMPLER_H_
diff --git a/src/profiling/memory/sampler_unittest.cc b/src/profiling/memory/sampler_unittest.cc
new file mode 100644
index 0000000..d598e71
--- /dev/null
+++ b/src/profiling/memory/sampler_unittest.cc
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/sampler.h"
+
+#include "gtest/gtest.h"
+
+#include <thread>
+
+namespace perfetto {
+namespace {
+
+TEST(SamplerTest, TestLarge) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ EXPECT_EQ(ShouldSample(key, 1024, 512, malloc, free), 1);
+ pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmall) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ // As we initialize interval_to_next_sample_ with 0, the first sample
+ // should always get sampled.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ pthread_key_delete(key);
+}
+
+TEST(SamplerTest, TestSmallFromThread) {
+ pthread_key_t key;
+ ASSERT_EQ(pthread_key_create(&key, ThreadLocalSamplingData::KeyDestructor),
+ 0);
+ std::thread th([key] {
+ // As we initialize interval_to_next_sample_ with 0, the first sample
+ // should always get sampled.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ });
+ std::thread th2([key] {
+ // The threads should have separate state.
+ EXPECT_EQ(ShouldSample(key, 1, 512, malloc, free), 1);
+ });
+ th.join();
+ th2.join();
+ pthread_key_delete(key);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/profiling/memory/socket_listener_unittest.cc b/src/profiling/memory/socket_listener_unittest.cc
index 58d5e8f..dd3c1a7 100644
--- a/src/profiling/memory/socket_listener_unittest.cc
+++ b/src/profiling/memory/socket_listener_unittest.cc
@@ -71,8 +71,10 @@
base::ScopedFile(open("/dev/null", O_RDONLY))};
int raw_fds[2] = {*fds[0], *fds[1]};
ASSERT_TRUE(client_socket->Send(&size, sizeof(size), raw_fds,
- base::ArraySize(raw_fds)));
- ASSERT_TRUE(client_socket->Send("1", 1));
+ base::ArraySize(raw_fds),
+ base::UnixSocket::BlockingMode::kBlocking));
+ ASSERT_TRUE(client_socket->Send("1", 1, -1,
+ base::UnixSocket::BlockingMode::kBlocking));
task_runner.RunUntilCheckpoint("callback.called");
}
diff --git a/src/profiling/memory/unwinding.cc b/src/profiling/memory/unwinding.cc
index 1dcc8c0..8ed2780 100644
--- a/src/profiling/memory/unwinding.cc
+++ b/src/profiling/memory/unwinding.cc
@@ -238,6 +238,8 @@
FreeRecord& free_rec = rec->free_record;
FreePageEntry* entries = free_rec.metadata->entries;
uint64_t num_entries = free_rec.metadata->num_entries;
+ if (num_entries > kFreePageSize)
+ return;
for (size_t i = 0; i < num_entries; ++i) {
const FreePageEntry& entry = entries[i];
metadata->heap_dump.RecordFree(entry.addr, entry.sequence_number);
diff --git a/src/profiling/memory/wire_protocol.cc b/src/profiling/memory/wire_protocol.cc
index 39373f9..255bae2 100644
--- a/src/profiling/memory/wire_protocol.cc
+++ b/src/profiling/memory/wire_protocol.cc
@@ -17,6 +17,7 @@
#include "src/profiling/memory/wire_protocol.h"
#include "perfetto/base/logging.h"
+#include "perfetto/base/unix_socket.h"
#include "perfetto/base/utils.h"
#include <sys/socket.h>
@@ -29,8 +30,8 @@
bool ViewAndAdvance(char** ptr, T** out, const char* end) {
if (end - sizeof(T) < *ptr)
return false;
- *out = reinterpret_cast<T*>(ptr);
- ptr += sizeof(T);
+ *out = reinterpret_cast<T*>(*ptr);
+ *ptr += sizeof(T);
return true;
}
} // namespace
@@ -44,9 +45,11 @@
iovecs[1].iov_base = const_cast<RecordType*>(&msg.record_type);
iovecs[1].iov_len = sizeof(msg.record_type);
if (msg.alloc_header) {
+ PERFETTO_DCHECK(msg.record_type == RecordType::Malloc);
iovecs[2].iov_base = msg.alloc_header;
iovecs[2].iov_len = sizeof(*msg.alloc_header);
} else if (msg.free_header) {
+ PERFETTO_DCHECK(msg.record_type == RecordType::Free);
iovecs[2].iov_base = msg.free_header;
iovecs[2].iov_len = sizeof(*msg.free_header);
} else {
@@ -68,7 +71,7 @@
total_size = iovecs[1].iov_len + iovecs[2].iov_len;
}
- ssize_t sent = sendmsg(sock, &hdr, MSG_NOSIGNAL);
+ ssize_t sent = base::SendMsgAll(sock, &hdr, MSG_NOSIGNAL);
return sent == static_cast<ssize_t>(total_size + sizeof(total_size));
}
@@ -77,23 +80,27 @@
char* end = buf + size;
if (!ViewAndAdvance<RecordType>(&buf, &record_type, end))
return false;
- switch (*record_type) {
- case RecordType::Malloc:
- if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
- return false;
- out->payload = buf;
- if (buf > end) {
- PERFETTO_DCHECK(false);
- return false;
- }
- out->payload_size = static_cast<size_t>(end - buf);
- break;
- case RecordType::Free:
- if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
- return false;
- break;
- }
+
+ out->payload = nullptr;
+ out->payload_size = 0;
out->record_type = *record_type;
+
+ if (*record_type == RecordType::Malloc) {
+ if (!ViewAndAdvance<AllocMetadata>(&buf, &out->alloc_header, end))
+ return false;
+ out->payload = buf;
+ if (buf > end) {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+ out->payload_size = static_cast<size_t>(end - buf);
+ } else if (*record_type == RecordType::Free) {
+ if (!ViewAndAdvance<FreeMetadata>(&buf, &out->free_header, end))
+ return false;
+ } else {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
return true;
}
diff --git a/src/profiling/memory/wire_protocol_unittest.cc b/src/profiling/memory/wire_protocol_unittest.cc
new file mode 100644
index 0000000..c9e8b22
--- /dev/null
+++ b/src/profiling/memory/wire_protocol_unittest.cc
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/memory/wire_protocol.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/base/scoped_file.h"
+#include "src/profiling/memory/record_reader.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace perfetto {
+
+bool operator==(const AllocMetadata& one, const AllocMetadata& other);
+bool operator==(const AllocMetadata& one, const AllocMetadata& other) {
+ return std::tie(one.sequence_number, one.alloc_size, one.alloc_address,
+ one.stack_pointer, one.stack_pointer_offset, one.arch) ==
+ std::tie(other.sequence_number, other.alloc_size,
+ other.alloc_address, other.stack_pointer,
+ other.stack_pointer_offset, other.arch) &&
+ memcmp(one.register_data, other.register_data, kMaxRegisterDataSize) ==
+ 0;
+}
+
+bool operator==(const FreeMetadata& one, const FreeMetadata& other);
+bool operator==(const FreeMetadata& one, const FreeMetadata& other) {
+ if (one.num_entries != other.num_entries)
+ return false;
+ for (size_t i = 0; i < one.num_entries; ++i) {
+ if (std::tie(one.entries[i].sequence_number, one.entries[i].addr) !=
+ std::tie(other.entries[i].sequence_number, other.entries[i].addr))
+ return false;
+ }
+ return true;
+}
+
+namespace {
+
+RecordReader::Record ReceiveAll(int sock) {
+ RecordReader record_reader;
+ RecordReader::Record record;
+ bool received = false;
+ while (!received) {
+ RecordReader::ReceiveBuffer buf = record_reader.BeginReceive();
+ ssize_t rd = PERFETTO_EINTR(read(sock, buf.data, buf.size));
+ PERFETTO_CHECK(rd > 0);
+ auto status = record_reader.EndReceive(static_cast<size_t>(rd), &record);
+ switch (status) {
+ case (RecordReader::Result::Noop):
+ break;
+ case (RecordReader::Result::RecordReceived):
+ received = true;
+ break;
+ case (RecordReader::Result::KillConnection):
+ PERFETTO_CHECK(false);
+ break;
+ }
+ }
+ return record;
+}
+
+TEST(WireProtocolTest, AllocMessage) {
+ char payload[] = {0x77, 0x77, 0x77, 0x00};
+ WireMessage msg = {};
+ msg.record_type = RecordType::Malloc;
+ AllocMetadata metadata = {};
+ metadata.sequence_number = 0xA1A2A3A4A5A6A7A8;
+ metadata.alloc_size = 0xB1B2B3B4B5B6B7B8;
+ metadata.alloc_address = 0xC1C2C3C4C5C6C7C8;
+ metadata.stack_pointer = 0xD1D2D3D4D5D6D7D8;
+ metadata.stack_pointer_offset = 0xE1E2E3E4E5E6E7E8;
+ metadata.arch = unwindstack::ARCH_X86;
+ for (size_t i = 0; i < kMaxRegisterDataSize; ++i)
+ metadata.register_data[i] = 0x66;
+ msg.alloc_header = &metadata;
+ msg.payload = payload;
+ msg.payload_size = sizeof(payload);
+
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_sock(sv[0]);
+ base::ScopedFile recv_sock(sv[1]);
+ ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+ RecordReader::Record record = ReceiveAll(*recv_sock);
+
+ WireMessage recv_msg;
+ ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+ record.size, &recv_msg));
+ ASSERT_EQ(recv_msg.record_type, msg.record_type);
+ ASSERT_EQ(*recv_msg.alloc_header, *msg.alloc_header);
+ ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+ ASSERT_STREQ(recv_msg.payload, msg.payload);
+}
+
+TEST(WireProtocolTest, FreeMessage) {
+ WireMessage msg = {};
+ msg.record_type = RecordType::Free;
+ FreeMetadata metadata = {};
+ metadata.num_entries = kFreePageSize;
+ for (size_t i = 0; i < kFreePageSize; ++i) {
+ metadata.entries[i].sequence_number = 0x111111111111111;
+ metadata.entries[i].addr = 0x222222222222222;
+ }
+ msg.free_header = &metadata;
+
+ int sv[2];
+ ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
+ base::ScopedFile send_sock(sv[0]);
+ base::ScopedFile recv_sock(sv[1]);
+ ASSERT_TRUE(SendWireMessage(*send_sock, msg));
+
+ RecordReader::Record record = ReceiveAll(*recv_sock);
+
+ WireMessage recv_msg;
+ ASSERT_TRUE(ReceiveWireMessage(reinterpret_cast<char*>(record.data.get()),
+ record.size, &recv_msg));
+ ASSERT_EQ(recv_msg.record_type, msg.record_type);
+ ASSERT_EQ(*recv_msg.free_header, *msg.free_header);
+ ASSERT_EQ(recv_msg.payload_size, msg.payload_size);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 978f128..a3da82c 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -90,6 +90,7 @@
deps = [
"../../buildtools:sqlite",
"../../gn:default_deps",
+ "../../include/perfetto/traced:sys_stats_counters",
"../../protos/perfetto/trace:lite",
"../../protos/perfetto/trace_processor:lite",
"../base",
diff --git a/src/trace_processor/counters_table.cc b/src/trace_processor/counters_table.cc
index cd14cc4..b01400f 100644
--- a/src/trace_processor/counters_table.cc
+++ b/src/trace_processor/counters_table.cc
@@ -102,6 +102,18 @@
sqlite3_result_text(context, "utid", -1, nullptr);
break;
}
+ case RefType::kNoRef: {
+ sqlite3_result_null(context);
+ break;
+ }
+ case RefType::kIrq: {
+ sqlite3_result_text(context, "irq", -1, nullptr);
+ break;
+ }
+ case RefType::kSoftIrq: {
+ sqlite3_result_text(context, "softirq", -1, nullptr);
+ break;
+ }
}
break;
}
diff --git a/src/trace_processor/proto_trace_parser.cc b/src/trace_processor/proto_trace_parser.cc
index ac1b65c..c82982a 100644
--- a/src/trace_processor/proto_trace_parser.cc
+++ b/src/trace_processor/proto_trace_parser.cc
@@ -24,6 +24,7 @@
#include "perfetto/base/string_view.h"
#include "perfetto/base/utils.h"
#include "perfetto/protozero/proto_decoder.h"
+#include "perfetto/traced/sys_stats_counters.h"
#include "src/trace_processor/process_tracker.h"
#include "src/trace_processor/sched_tracker.h"
#include "src/trace_processor/slice_tracker.h"
@@ -102,11 +103,37 @@
ProtoTraceParser::ProtoTraceParser(TraceProcessorContext* context)
: context_(context),
- cpu_freq_name_id_(context->storage->InternString("cpufreq")) {}
+ cpu_freq_name_id_(context->storage->InternString("cpufreq")),
+ num_forks_name_id_(context->storage->InternString("num_forks")),
+ num_irq_total_name_id_(context->storage->InternString("num_irq_total")),
+ num_softirq_total_name_id_(
+ context->storage->InternString("num_softirq_total")),
+ num_irq_name_id_(context->storage->InternString("num_irq")),
+ num_softirq_name_id_(context->storage->InternString("num_softirq")),
+ cpu_times_user_ns_id_(
+ context->storage->InternString("cpu.times.user_ns")),
+ cpu_times_user_ice_ns_id_(
+ context->storage->InternString("cpu.times.user_ice_ns")),
+ cpu_times_system_mode_ns_id_(
+ context->storage->InternString("cpu.times.system_mode_ns")),
+ cpu_times_idle_ns_id_(
+ context->storage->InternString("cpu.times.idle_ns")),
+ cpu_times_io_wait_ns_id_(
+ context->storage->InternString("cpu.times.io_wait_ns")),
+ cpu_times_irq_ns_id_(context->storage->InternString("cpu.times.irq_ns")),
+ cpu_times_softirq_ns_id_(
+ context->storage->InternString("cpu.times.softirq_ns")) {
+ for (const auto& name : BuildMeminfoCounterNames()) {
+ meminfo_strs_id_.emplace_back(context->storage->InternString(name));
+ }
+ for (const auto& name : BuildVmstatCounterNames()) {
+ vmstat_strs_id_.emplace_back(context->storage->InternString(name));
+ }
+}
ProtoTraceParser::~ProtoTraceParser() = default;
-void ProtoTraceParser::ParseTracePacket(TraceBlobView packet) {
+void ProtoTraceParser::ParseTracePacket(uint64_t ts, TraceBlobView packet) {
ProtoDecoder decoder(packet.data(), packet.length());
for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
@@ -116,6 +143,11 @@
ParseProcessTree(packet.slice(fld_off, fld.size()));
break;
}
+ case protos::TracePacket::kSysStatsFieldNumber: {
+ const size_t fld_off = packet.offset_of(fld.data());
+ ParseSysStats(ts, packet.slice(fld_off, fld.size()));
+ break;
+ }
default:
break;
}
@@ -123,6 +155,192 @@
PERFETTO_DCHECK(decoder.IsEndOfBuffer());
}
+void ProtoTraceParser::ParseSysStats(uint64_t ts, TraceBlobView stats) {
+ ProtoDecoder decoder(stats.data(), stats.length());
+ for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+ switch (fld.id) {
+ case protos::SysStats::kMeminfoFieldNumber: {
+ const size_t fld_off = stats.offset_of(fld.data());
+ ParseMemInfo(ts, stats.slice(fld_off, fld.size()));
+ break;
+ }
+ case protos::SysStats::kVmstatFieldNumber: {
+ const size_t fld_off = stats.offset_of(fld.data());
+ ParseVmStat(ts, stats.slice(fld_off, fld.size()));
+ break;
+ }
+ case protos::SysStats::kCpuStatFieldNumber: {
+ const size_t fld_off = stats.offset_of(fld.data());
+ ParseCpuTimes(ts, stats.slice(fld_off, fld.size()));
+ break;
+ }
+ case protos::SysStats::kNumIrqFieldNumber: {
+ const size_t fld_off = stats.offset_of(fld.data());
+ ParseIrqCount(ts, stats.slice(fld_off, fld.size()),
+ /*is_softirq=*/false);
+ break;
+ }
+ case protos::SysStats::kNumSoftirqFieldNumber: {
+ const size_t fld_off = stats.offset_of(fld.data());
+ ParseIrqCount(ts, stats.slice(fld_off, fld.size()),
+ /*is_softirq=*/true);
+ break;
+ }
+ case protos::SysStats::kNumForksFieldNumber: {
+ context_->sched_tracker->PushCounter(
+ ts, fld.as_uint32(), num_forks_name_id_, 0, RefType::kNoRef);
+ break;
+ }
+ case protos::SysStats::kNumIrqTotalFieldNumber: {
+ context_->sched_tracker->PushCounter(
+ ts, fld.as_uint32(), num_irq_total_name_id_, 0, RefType::kNoRef);
+ break;
+ }
+ case protos::SysStats::kNumSoftirqTotalFieldNumber: {
+ context_->sched_tracker->PushCounter(ts, fld.as_uint32(),
+ num_softirq_total_name_id_, 0,
+ RefType::kNoRef);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+}
+void ProtoTraceParser::ParseIrqCount(uint64_t ts,
+ TraceBlobView irq,
+ bool is_soft) {
+ ProtoDecoder decoder(irq.data(), irq.length());
+ uint32_t key = 0;
+ uint32_t value = 0;
+ for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+ switch (fld.id) {
+ case protos::SysStats::InterruptCount::kIrqFieldNumber:
+ key = fld.as_uint32();
+ break;
+ case protos::SysStats::InterruptCount::kCountFieldNumber:
+ value = fld.as_uint32();
+ break;
+ }
+ }
+ RefType ref_type = is_soft ? RefType::kIrq : RefType::kSoftIrq;
+ StringId name_id = is_soft ? num_irq_name_id_ : num_softirq_name_id_;
+ context_->sched_tracker->PushCounter(ts, value, name_id, key, ref_type);
+}
+
+void ProtoTraceParser::ParseMemInfo(uint64_t ts, TraceBlobView mem) {
+ ProtoDecoder decoder(mem.data(), mem.length());
+ uint32_t key = 0;
+ uint32_t value = 0;
+ for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+ switch (fld.id) {
+ case protos::SysStats::MeminfoValue::kKeyFieldNumber:
+ key = fld.as_uint32();
+ break;
+ case protos::SysStats::MeminfoValue::kValueFieldNumber:
+ value = fld.as_uint32();
+ break;
+ }
+ }
+ if (PERFETTO_UNLIKELY(key >= meminfo_strs_id_.size())) {
+ PERFETTO_ELOG("MemInfo key %d is not recognized.", key);
+ return;
+ }
+ context_->sched_tracker->PushCounter(ts, value, meminfo_strs_id_[key], 0,
+ RefType::kNoRef);
+}
+
+void ProtoTraceParser::ParseVmStat(uint64_t ts, TraceBlobView stat) {
+ ProtoDecoder decoder(stat.data(), stat.length());
+ uint32_t key = 0;
+ uint32_t value = 0;
+ for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+ switch (fld.id) {
+ case protos::SysStats::VmstatValue::kKeyFieldNumber:
+ key = fld.as_uint32();
+ break;
+ case protos::SysStats::VmstatValue::kValueFieldNumber:
+ value = fld.as_uint32();
+ break;
+ }
+ }
+ if (PERFETTO_UNLIKELY(key >= vmstat_strs_id_.size())) {
+ PERFETTO_ELOG("VmStat key %d is not recognized.", key);
+ return;
+ }
+ context_->sched_tracker->PushCounter(ts, value, vmstat_strs_id_[key], 0,
+ RefType::kNoRef);
+}
+
+void ProtoTraceParser::ParseCpuTimes(uint64_t ts, TraceBlobView cpu_times) {
+ ProtoDecoder decoder(cpu_times.data(), cpu_times.length());
+ uint64_t cpu = 0;
+ uint32_t value = 0;
+ // Speculate on CPU being first.
+ constexpr auto kCpuFieldTag = protozero::proto_utils::MakeTagVarInt(
+ protos::SysStats::CpuTimes::kCpuIdFieldNumber);
+ if (cpu_times.length() > 2 && cpu_times.data()[0] == kCpuFieldTag &&
+ cpu_times.data()[1] < 0x80) {
+ cpu = cpu_times.data()[1];
+ } else {
+ if (!PERFETTO_LIKELY((
+ decoder.FindIntField<protos::SysStats::CpuTimes::kCpuIdFieldNumber>(
+ &cpu)))) {
+ PERFETTO_ELOG("CPU field not found in CpuTimes");
+ return;
+ }
+ }
+
+ for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
+ switch (fld.id) {
+ case protos::SysStats::CpuTimes::kUserNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(ts, value, cpu_times_user_ns_id_,
+ cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kUserIceNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(
+ ts, value, cpu_times_user_ice_ns_id_, cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kSystemModeNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(
+ ts, value, cpu_times_system_mode_ns_id_, cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kIdleNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(ts, value, cpu_times_idle_ns_id_,
+ cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kIoWaitNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(
+ ts, value, cpu_times_io_wait_ns_id_, cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kIrqNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(ts, value, cpu_times_irq_ns_id_,
+ cpu, RefType::kCPU_ID);
+ break;
+ }
+ case protos::SysStats::CpuTimes::kSoftirqNsFieldNumber: {
+ value = fld.as_uint32();
+ context_->sched_tracker->PushCounter(
+ ts, value, cpu_times_softirq_ns_id_, cpu, RefType::kCPU_ID);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+}
+
void ProtoTraceParser::ParseProcessTree(TraceBlobView pstree) {
ProtoDecoder decoder(pstree.data(), pstree.length());
diff --git a/src/trace_processor/proto_trace_parser.h b/src/trace_processor/proto_trace_parser.h
index 9da91e4..73b3682 100644
--- a/src/trace_processor/proto_trace_parser.h
+++ b/src/trace_processor/proto_trace_parser.h
@@ -54,7 +54,7 @@
virtual ~ProtoTraceParser();
// virtual for testing.
- virtual void ParseTracePacket(TraceBlobView);
+ virtual void ParseTracePacket(uint64_t timestamp, TraceBlobView);
virtual void ParseFtracePacket(uint32_t cpu,
uint64_t timestamp,
TraceBlobView);
@@ -64,10 +64,29 @@
void ParsePrint(uint32_t cpu, uint64_t timestamp, TraceBlobView);
void ParseThread(TraceBlobView);
void ParseProcess(TraceBlobView);
+ void ParseSysStats(uint64_t ts, TraceBlobView);
+ void ParseMemInfo(uint64_t ts, TraceBlobView);
+ void ParseVmStat(uint64_t ts, TraceBlobView);
+ void ParseCpuTimes(uint64_t ts, TraceBlobView);
+ void ParseIrqCount(uint64_t ts, TraceBlobView, bool is_soft);
private:
TraceProcessorContext* context_;
const StringId cpu_freq_name_id_;
+ const StringId num_forks_name_id_;
+ const StringId num_irq_total_name_id_;
+ const StringId num_softirq_total_name_id_;
+ const StringId num_irq_name_id_;
+ const StringId num_softirq_name_id_;
+ const StringId cpu_times_user_ns_id_;
+ const StringId cpu_times_user_ice_ns_id_;
+ const StringId cpu_times_system_mode_ns_id_;
+ const StringId cpu_times_idle_ns_id_;
+ const StringId cpu_times_io_wait_ns_id_;
+ const StringId cpu_times_irq_ns_id_;
+ const StringId cpu_times_softirq_ns_id_;
+ std::vector<StringId> meminfo_strs_id_;
+ std::vector<StringId> vmstat_strs_id_;
};
} // namespace trace_processor
diff --git a/src/trace_processor/proto_trace_parser_unittest.cc b/src/trace_processor/proto_trace_parser_unittest.cc
index c0881c8..e502da5 100644
--- a/src/trace_processor/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/proto_trace_parser_unittest.cc
@@ -36,6 +36,7 @@
using ::testing::ElementsAreArray;
using ::testing::Eq;
using ::testing::Pointwise;
+using ::testing::NiceMock;
class MockSchedTracker : public SchedTracker {
public:
@@ -79,7 +80,7 @@
class ProtoTraceParserTest : public ::testing::Test {
public:
ProtoTraceParserTest() {
- storage_ = new MockTraceStorage();
+ storage_ = new NiceMock<MockTraceStorage>();
context_.storage.reset(storage_);
sched_ = new MockSchedTracker(&context_);
context_.sched_tracker.reset(sched_);
@@ -102,7 +103,7 @@
TraceProcessorContext context_;
MockSchedTracker* sched_;
MockProcessTracker* process_;
- MockTraceStorage* storage_;
+ NiceMock<MockTraceStorage>* storage_;
};
TEST_F(ProtoTraceParserTest, LoadSingleEvent) {
@@ -232,6 +233,36 @@
Tokenize(trace_2);
}
+TEST_F(ProtoTraceParserTest, LoadMemInfo) {
+ protos::Trace trace_1;
+ auto* packet = trace_1.add_packet();
+ uint64_t ts = 1000;
+ packet->set_timestamp(ts);
+ auto* bundle = packet->mutable_sys_stats();
+ auto* meminfo = bundle->add_meminfo();
+ meminfo->set_key(perfetto::protos::MEMINFO_MEM_TOTAL);
+ uint32_t value = 10;
+ meminfo->set_value(value);
+
+ EXPECT_CALL(*sched_, PushCounter(ts, value, 0, 0, RefType::kNoRef));
+ Tokenize(trace_1);
+}
+
+TEST_F(ProtoTraceParserTest, LoadVmStats) {
+ protos::Trace trace_1;
+ auto* packet = trace_1.add_packet();
+ uint64_t ts = 1000;
+ packet->set_timestamp(ts);
+ auto* bundle = packet->mutable_sys_stats();
+ auto* meminfo = bundle->add_vmstat();
+ meminfo->set_key(perfetto::protos::VMSTAT_COMPACT_SUCCESS);
+ uint32_t value = 10;
+ meminfo->set_value(value);
+
+ EXPECT_CALL(*sched_, PushCounter(ts, value, 0, 0, RefType::kNoRef));
+ Tokenize(trace_1);
+}
+
TEST_F(ProtoTraceParserTest, LoadCpuFreq) {
protos::Trace trace_1;
auto* bundle = trace_1.add_packet()->mutable_ftrace_events();
diff --git a/src/trace_processor/proto_trace_tokenizer.cc b/src/trace_processor/proto_trace_tokenizer.cc
index 730f9d4..f3284c9 100644
--- a/src/trace_processor/proto_trace_tokenizer.cc
+++ b/src/trace_processor/proto_trace_tokenizer.cc
@@ -133,9 +133,28 @@
}
void ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
+ constexpr auto kTimestampFieldNumber =
+ protos::TracePacket::kTimestampFieldNumber;
ProtoDecoder decoder(packet.data(), packet.length());
+ uint64_t timestamp = 0;
+ bool timestamp_found = false;
- // TODO(taylori): Add a timestamp to TracePacket and read it here.
+ // Speculate on the fact that the timestamp is often the 1st field of the
+ // packet.
+ constexpr auto timestampFieldTag = MakeTagVarInt(kTimestampFieldNumber);
+ if (PERFETTO_LIKELY(packet.length() > 10 &&
+ packet.data()[0] == timestampFieldTag)) {
+ // Fastpath.
+ const uint8_t* next =
+ ParseVarInt(packet.data() + 1, packet.data() + 11, ×tamp);
+ timestamp_found = next != packet.data() + 1;
+ decoder.Reset(next);
+ } else {
+ // Slowpath.
+ timestamp_found = decoder.FindIntField<kTimestampFieldNumber>(×tamp);
+ }
+ if (timestamp_found)
+ last_timestamp_ = timestamp;
// TODO(primiano): this can be optimized for the ftrace case.
for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index 2ee3e27..ed2d11f 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -38,6 +38,8 @@
#if PERFETTO_BUILDFLAG(PERFETTO_STANDALONE_BUILD)
#include <linenoise.h>
+#include <pwd.h>
+#include <sys/types.h>
#endif
#if PERFETTO_HAS_SIGNAL_H()
@@ -52,13 +54,54 @@
#if PERFETTO_BUILDFLAG(PERFETTO_STANDALONE_BUILD)
+bool EnsureDir(const std::string& path) {
+ return mkdir(path.c_str(), 0755) != -1 || errno == EEXIST;
+}
+
+bool EnsureFile(const std::string& path) {
+ return base::OpenFile(path, O_RDONLY | O_CREAT).get() != -1;
+}
+
+std::string GetConfigPath() {
+ const char* homedir = getenv("HOME");
+ if (homedir == nullptr)
+ homedir = getpwuid(getuid())->pw_dir;
+ if (homedir == nullptr)
+ return "";
+ return std::string(homedir) + "/.config";
+}
+
+std::string GetPerfettoPath() {
+ std::string config = GetConfigPath();
+ if (config == "")
+ return "";
+ return config + "/perfetto";
+}
+
+std::string GetHistoryPath() {
+ std::string perfetto = GetPerfettoPath();
+ if (perfetto == "")
+ return "";
+ return perfetto + "/.trace_processor_shell_history";
+}
+
void SetupLineEditor() {
linenoiseSetMultiLine(true);
linenoiseHistorySetMaxLen(1000);
+
+ bool success = GetHistoryPath() != "";
+ success = success && EnsureDir(GetConfigPath());
+ success = success && EnsureDir(GetPerfettoPath());
+ success = success && EnsureFile(GetHistoryPath());
+ success = success && linenoiseHistoryLoad(GetHistoryPath().c_str()) != -1;
+ if (!success) {
+ PERFETTO_PLOG("Could not load history from %s", GetHistoryPath().c_str());
+ }
}
void FreeLine(char* line) {
linenoiseHistoryAdd(line);
+ linenoiseHistorySave(GetHistoryPath().c_str());
linenoiseFree(line);
}
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index ea5c927..1158734 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -70,7 +70,7 @@
next_stage->ParseFtracePacket(it->cpu, it->timestamp,
std::move(it->blob_view));
} else {
- next_stage->ParseTracePacket(std::move(it->blob_view));
+ next_stage->ParseTracePacket(it->timestamp, std::move(it->blob_view));
}
}
diff --git a/src/trace_processor/trace_sorter_unittest.cc b/src/trace_processor/trace_sorter_unittest.cc
index 8716fbb..49fc021 100644
--- a/src/trace_processor/trace_sorter_unittest.cc
+++ b/src/trace_processor/trace_sorter_unittest.cc
@@ -28,6 +28,7 @@
using ::testing::_;
using ::testing::InSequence;
+using ::testing::NiceMock;
class MockTraceParser : public ProtoTraceParser {
public:
@@ -45,10 +46,11 @@
MOCK_ParseFtracePacket(cpu, timestamp, tbv.data(), tbv.length());
}
- MOCK_METHOD2(MOCK_ParseTracePacket, void(const uint8_t* data, size_t length));
+ MOCK_METHOD3(MOCK_ParseTracePacket,
+ void(uint64_t ts, const uint8_t* data, size_t length));
- void ParseTracePacket(TraceBlobView tbv) override {
- MOCK_ParseTracePacket(tbv.data(), tbv.length());
+ void ParseTracePacket(uint64_t ts, TraceBlobView tbv) override {
+ MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
}
};
@@ -63,7 +65,7 @@
public:
TraceSorterTest()
: test_buffer_(std::unique_ptr<uint8_t[]>(new uint8_t[8]), 0, 8) {
- storage_ = new MockTraceStorage();
+ storage_ = new NiceMock<MockTraceStorage>();
context_.storage.reset(storage_);
context_.sorter.reset(
new TraceSorter(&context_, GetParam(), 0 /*window_size*/));
@@ -74,7 +76,7 @@
protected:
TraceProcessorContext context_;
MockTraceParser* parser_;
- MockTraceStorage* storage_;
+ NiceMock<MockTraceStorage>* storage_;
TraceBlobView test_buffer_;
};
@@ -93,7 +95,7 @@
TEST_P(TraceSorterTest, TestTracePacket) {
TraceBlobView view = test_buffer_.slice(0, 1);
- EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view.data(), 1));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
context_.sorter->PushTracePacket(1000, std::move(view));
context_.sorter->FlushEventsForced();
}
@@ -107,8 +109,8 @@
InSequence s;
EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1));
- EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view_2.data(), 2));
- EXPECT_CALL(*parser_, MOCK_ParseTracePacket(view_3.data(), 3));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
context_.sorter->set_window_ns_for_testing(200);
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index fb4e92c..9a86cf4 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -43,7 +43,7 @@
// StringId is an offset into |string_pool_|.
using StringId = size_t;
-enum RefType { kUTID = 0, kCPU_ID = 1 };
+enum RefType { kNoRef = 0, kUTID = 1, kCPU_ID = 2, kIrq = 3, kSoftIrq = 4 };
// Stores a data inside a trace file in a columnar form. This makes it efficient
// to read or search across a single field of the trace (e.g. all the thread
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 02fcbf8..9d3a098 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -131,6 +131,11 @@
InodeFileDataSource::~InodeFileDataSource() = default;
+void InodeFileDataSource::Start() {
+ // Nothing special to do, this data source is only reacting to on-demand
+ // events such as OnInodes().
+}
+
void InodeFileDataSource::AddInodesFromStaticMap(
BlockDeviceID block_device_id,
std::set<Inode>* inode_numbers) {
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 181dfda..143df66 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -79,12 +79,14 @@
void AddInodesFromLRUCache(BlockDeviceID block_device_id,
std::set<Inode>* inode_numbers);
- void Flush() override;
-
virtual void FillInodeEntry(InodeFileMap* destination,
Inode inode_number,
const InodeMapValue& inode_map_value);
+ // ProbesDataSource implementation.
+ void Start() override;
+ void Flush() override;
+
protected:
std::multimap<BlockDeviceID, std::string> mount_points_;
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.cc b/src/traced/probes/ftrace/ftrace_config_muxer.cc
index 6af02b4..5374ba7 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.cc
@@ -228,23 +228,25 @@
: ftrace_(ftrace), table_(table), current_state_(), configs_() {}
FtraceConfigMuxer::~FtraceConfigMuxer() = default;
-FtraceConfigId FtraceConfigMuxer::RequestConfig(const FtraceConfig& request) {
+FtraceConfigId FtraceConfigMuxer::SetupConfig(const FtraceConfig& request) {
FtraceConfig actual;
bool is_ftrace_enabled = ftrace_->IsTracingEnabled();
if (configs_.empty()) {
+ PERFETTO_DCHECK(active_configs_.empty());
PERFETTO_DCHECK(!current_state_.tracing_on);
// If someone outside of perfetto is using ftrace give up now.
if (is_ftrace_enabled)
return 0;
- // If we're about to turn tracing on use this opportunity do some setup:
+ // Setup ftrace, without starting it. Setting buffers can be quite slow
+ // (up to hundreds of ms).
SetupClock(request);
SetupBufferSize(request);
} else {
// Did someone turn ftrace off behind our back? If so give up.
- if (!is_ftrace_enabled)
+ if (!active_configs_.empty() && !is_ftrace_enabled)
return 0;
}
@@ -272,17 +274,34 @@
}
}
- if (configs_.empty()) {
- PERFETTO_DCHECK(!current_state_.tracing_on);
- ftrace_->EnableTracing();
- current_state_.tracing_on = true;
- }
-
FtraceConfigId id = ++last_id_;
configs_.emplace(id, std::move(actual));
return id;
}
+bool FtraceConfigMuxer::ActivateConfig(FtraceConfigId id) {
+ if (!id || configs_.count(id) == 0) {
+ PERFETTO_DCHECK(false);
+ return false;
+ }
+
+ active_configs_.insert(id);
+ if (active_configs_.size() > 1) {
+ PERFETTO_DCHECK(current_state_.tracing_on);
+ return true; // We are not the first, ftrace is already enabled. All done.
+ }
+
+ PERFETTO_DCHECK(!current_state_.tracing_on);
+ if (ftrace_->IsTracingEnabled()) {
+ // If someone outside of perfetto is using ftrace give up now.
+ return false;
+ }
+
+ ftrace_->EnableTracing();
+ current_state_.tracing_on = true;
+ return true;
+}
+
bool FtraceConfigMuxer::RemoveConfig(FtraceConfigId id) {
if (!id || !configs_.erase(id))
return false;
@@ -305,13 +324,25 @@
current_state_.ftrace_events.erase(name);
}
- if (configs_.empty()) {
+ // If there aren't any more active configs, disable ftrace.
+ auto active_it = active_configs_.find(id);
+ if (active_it != active_configs_.end()) {
PERFETTO_DCHECK(current_state_.tracing_on);
- ftrace_->DisableTracing();
+ active_configs_.erase(active_it);
+ if (active_configs_.empty()) {
+ // This was the last active config, disable ftrace.
+ ftrace_->DisableTracing();
+ current_state_.tracing_on = false;
+ }
+ }
+
+ // Even if we don't have any other active configs, we might still have idle
+ // configs around. Tear down the rest of the ftrace config only if all
+ // configs are removed.
+ if (configs_.empty()) {
ftrace_->SetCpuBufferSizeInPages(0);
ftrace_->DisableAllEvents();
ftrace_->ClearTrace();
- current_state_.tracing_on = false;
if (current_state_.atrace_on)
DisableAtrace();
}
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer.h b/src/traced/probes/ftrace/ftrace_config_muxer.h
index e8164b9..85ad05b 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer.h
+++ b/src/traced/probes/ftrace/ftrace_config_muxer.h
@@ -17,6 +17,9 @@
#ifndef SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
#define SRC_TRACED_PROBES_FTRACE_FTRACE_CONFIG_MUXER_H_
+#include <map>
+#include <set>
+
#include "src/traced/probes/ftrace/ftrace_config.h"
#include "src/traced/probes/ftrace/ftrace_controller.h"
#include "src/traced/probes/ftrace/ftrace_procfs.h"
@@ -29,7 +32,7 @@
// messing with the ftrace settings at the same time as us.
// Specifically FtraceConfigMuxer takes in a *requested* FtraceConfig
-// (|RequestConfig|), makes a best effort attempt to modify the ftrace
+// (|SetupConfig|), makes a best effort attempt to modify the ftrace
// debugfs files to honor those settings without interupting other perfetto
// traces already in progress or other users of ftrace, then returns an
// FtraceConfigId representing that config or zero on failure.
@@ -53,11 +56,14 @@
// If someone else is tracing we won't touch atrace (since it resets the
// buffer).
// To see the config you ended up with use |GetConfig|.
- FtraceConfigId RequestConfig(const FtraceConfig& request);
+ FtraceConfigId SetupConfig(const FtraceConfig& request);
+
+ // Activate ftrace for the given config (if not already active).
+ bool ActivateConfig(FtraceConfigId);
// Undo changes for the given config. Returns false iff the id is 0
// or already removed.
- bool RemoveConfig(FtraceConfigId id);
+ bool RemoveConfig(FtraceConfigId);
// public for testing
void SetupClockForTesting(const FtraceConfig& request) {
@@ -91,7 +97,15 @@
const ProtoTranslationTable* table_;
FtraceState current_state_;
+
+ // Set of all configurations. Note that not all of them might be active.
+ // When a config is present but not active, we do setup buffer sizes and
+ // events, but don't enable ftrace (i.e. tracing_on).
std::map<FtraceConfigId, FtraceConfig> configs_;
+
+ // Subset of |configs_| that are currently active. At any time ftrace is
+ // enabled iff |active_configs_| is not empty.
+ std::set<FtraceConfigId> active_configs_;
};
std::set<std::string> GetFtraceEvents(const FtraceConfig& request,
diff --git a/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc b/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
index b67c8a0..3e3fe28 100644
--- a/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_config_muxer_unittest.cc
@@ -163,14 +163,16 @@
.Times(AnyNumber());
EXPECT_CALL(ftrace, ReadOneCharFromFile("/root/tracing_on"))
- .WillOnce(Return('0'));
+ .Times(2)
+ .WillRepeatedly(Return('0'));
EXPECT_CALL(ftrace, WriteToFile("/root/buffer_size_kb", "512"));
EXPECT_CALL(ftrace, WriteToFile("/root/trace_clock", "boot"));
EXPECT_CALL(ftrace, WriteToFile("/root/tracing_on", "1"));
EXPECT_CALL(ftrace,
WriteToFile("/root/events/sched/sched_switch/enable", "1"));
- FtraceConfigId id = model.RequestConfig(config);
+ FtraceConfigId id = model.SetupConfig(config);
ASSERT_TRUE(id);
+ ASSERT_TRUE(model.ActivateConfig(id));
const FtraceConfig* actual_config = model.GetConfig(id);
EXPECT_TRUE(actual_config);
@@ -198,7 +200,7 @@
// If someone is using ftrace already don't stomp on what they are doing.
EXPECT_CALL(ftrace, ReadOneCharFromFile("/root/tracing_on"))
.WillOnce(Return('1'));
- FtraceConfigId id = model.RequestConfig(config);
+ FtraceConfigId id = model.SetupConfig(config);
ASSERT_FALSE(id);
}
@@ -219,7 +221,7 @@
{"atrace", "--async_start", "--only_userspace", "sched"})))
.WillOnce(Return(true));
- FtraceConfigId id = model.RequestConfig(config);
+ FtraceConfigId id = model.SetupConfig(config);
ASSERT_TRUE(id);
const FtraceConfig* actual_config = model.GetConfig(id);
@@ -253,7 +255,7 @@
"com.google.android.gms.persistent,com.google.android.gms"})))
.WillOnce(Return(true));
- FtraceConfigId id = model.RequestConfig(config);
+ FtraceConfigId id = model.SetupConfig(config);
ASSERT_TRUE(id);
const FtraceConfig* actual_config = model.GetConfig(id);
diff --git a/src/traced/probes/ftrace/ftrace_controller.cc b/src/traced/probes/ftrace/ftrace_controller.cc
index 86685bc..5cff028 100644
--- a/src/traced/probes/ftrace/ftrace_controller.cc
+++ b/src/traced/probes/ftrace/ftrace_controller.cc
@@ -145,6 +145,7 @@
for (const auto* data_source : data_sources_)
ftrace_config_muxer_->RemoveConfig(data_source->config_id());
data_sources_.clear();
+ started_data_sources_.clear();
StopIfNeeded();
}
@@ -180,7 +181,7 @@
continue;
// This method reads the pipe and converts the raw ftrace data into
// protobufs using the |data_source|'s TraceWriter.
- ctrl->cpu_readers_[cpu]->Drain(ctrl->data_sources_);
+ ctrl->cpu_readers_[cpu]->Drain(ctrl->started_data_sources_);
ctrl->OnDrainCpuForTesting(cpu);
}
@@ -206,9 +207,9 @@
}
void FtraceController::StartIfNeeded() {
- if (data_sources_.size() > 1)
+ if (started_data_sources_.size() > 1)
return;
- PERFETTO_CHECK(!data_sources_.empty());
+ PERFETTO_DCHECK(!started_data_sources_.empty());
{
std::unique_lock<std::mutex> lock(lock_);
PERFETTO_CHECK(!listening_for_raw_trace_data_);
@@ -249,14 +250,16 @@
}
void FtraceController::StopIfNeeded() {
- if (!data_sources_.empty())
+ if (!started_data_sources_.empty())
return;
+
{
// Unblock any readers that are waiting for us to drain data.
std::unique_lock<std::mutex> lock(lock_);
listening_for_raw_trace_data_ = false;
cpus_to_drain_.reset();
}
+
data_drained_.notify_all();
cpu_readers_.clear();
}
@@ -297,7 +300,7 @@
if (!ValidConfig(data_source->config()))
return false;
- auto config_id = ftrace_config_muxer_->RequestConfig(data_source->config());
+ auto config_id = ftrace_config_muxer_->SetupConfig(data_source->config());
if (!config_id)
return false;
@@ -305,13 +308,27 @@
*table_, FtraceEventsAsSet(*ftrace_config_muxer_->GetConfig(config_id))));
auto it_and_inserted = data_sources_.insert(data_source);
PERFETTO_DCHECK(it_and_inserted.second);
- StartIfNeeded();
data_source->Initialize(config_id, std::move(filter));
return true;
}
+bool FtraceController::StartDataSource(FtraceDataSource* data_source) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+
+ FtraceConfigId config_id = data_source->config_id();
+ PERFETTO_CHECK(config_id);
+
+ if (!ftrace_config_muxer_->ActivateConfig(config_id))
+ return false;
+
+ started_data_sources_.insert(data_source);
+ StartIfNeeded();
+ return true;
+}
+
void FtraceController::RemoveDataSource(FtraceDataSource* data_source) {
PERFETTO_DCHECK_THREAD(thread_checker_);
+ started_data_sources_.erase(data_source);
size_t removed = data_sources_.erase(data_source);
if (!removed)
return; // Can happen if AddDataSource failed (e.g. too many sessions).
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index b950e31..fae0a68 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -64,6 +64,7 @@
void ClearTrace();
bool AddDataSource(FtraceDataSource*) PERFETTO_WARN_UNUSED_RESULT;
+ bool StartDataSource(FtraceDataSource*);
void RemoveDataSource(FtraceDataSource*);
void DumpFtraceStats(FtraceStats*);
@@ -123,6 +124,7 @@
bool atrace_running_ = false;
std::map<size_t, std::unique_ptr<CpuReader>> cpu_readers_;
std::set<FtraceDataSource*> data_sources_;
+ std::set<FtraceDataSource*> started_data_sources_;
base::WeakPtrFactory<FtraceController> weak_factory_; // Keep last.
PERFETTO_THREAD_CHECKER(thread_checker_)
};
diff --git a/src/traced/probes/ftrace/ftrace_controller_unittest.cc b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
index a441b2e..9e74a0a 100644
--- a/src/traced/probes/ftrace/ftrace_controller_unittest.cc
+++ b/src/traced/probes/ftrace/ftrace_controller_unittest.cc
@@ -302,12 +302,14 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
- EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
auto data_source = controller->AddFakeDataSource(config);
ASSERT_TRUE(data_source);
+ EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
+
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", "0"));
EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
.WillOnce(Return(true));
@@ -330,12 +332,16 @@
FtraceConfig configA = CreateFtraceConfig({"foo"});
FtraceConfig configB = CreateFtraceConfig({"foo", "bar"});
- EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
auto data_sourceA = controller->AddFakeDataSource(configA);
EXPECT_CALL(*controller->procfs(), WriteToFile(kBarEnablePath, "1"));
auto data_sourceB = controller->AddFakeDataSource(configB);
+
+ EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+ ASSERT_TRUE(controller->StartDataSource(data_sourceA.get()));
+ ASSERT_TRUE(controller->StartDataSource(data_sourceB.get()));
+
data_sourceA.reset();
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
@@ -356,10 +362,12 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
EXPECT_CALL(*controller->procfs(), WriteToFile("/root/buffer_size_kb", _));
- EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "1"));
auto data_source = controller->AddFakeDataSource(config);
+ EXPECT_CALL(*controller->procfs(), WriteToFile("/root/tracing_on", "1"));
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
+
EXPECT_CALL(*controller->procfs(), WriteToFile(kFooEnablePath, "0"));
EXPECT_CALL(*controller->procfs(), ClearFile("/root/trace"))
.WillOnce(Return(true));
@@ -383,6 +391,7 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
// Only one call to drain should be scheduled for the next drain period.
EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100));
@@ -427,6 +436,7 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
// Test several cycles of a worker producing data and make sure the drain
// delay is consistent with the drain period.
@@ -469,6 +479,7 @@
EXPECT_CALL(*controller->runner(), PostDelayedTask(_, 100)).Times(2);
FtraceConfig config = CreateFtraceConfig({"foo"});
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
auto on_data_available = controller->GetDataAvailableCallback(0u);
std::thread worker([on_data_available] { on_data_available(); });
@@ -482,6 +493,7 @@
// Register another data source and wait for it to generate data.
data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
std::thread worker2([on_data_available] { on_data_available(); });
controller->WaitForData(0u);
@@ -506,6 +518,7 @@
WriteToFile("/root/buffer_size_kb", "512"));
FtraceConfig config = CreateFtraceConfig({"foo"});
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
{
@@ -515,6 +528,7 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
config.set_buffer_size_kb(10 * 1024 * 1024);
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
{
@@ -525,6 +539,7 @@
ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
config.set_buffer_size_kb(65 * 1024);
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
{
@@ -534,6 +549,7 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
config.set_buffer_size_kb(1);
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
{
@@ -543,6 +559,7 @@
FtraceConfig config = CreateFtraceConfig({"foo"});
config.set_buffer_size_kb(42);
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
{
@@ -553,6 +570,7 @@
ON_CALL(*controller->procfs(), NumberOfCpus()).WillByDefault(Return(2));
config.set_buffer_size_kb(42);
auto data_source = controller->AddFakeDataSource(config);
+ ASSERT_TRUE(controller->StartDataSource(data_source.get()));
}
}
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index da5391e..d7bc843 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -45,8 +45,18 @@
void FtraceDataSource::Initialize(FtraceConfigId config_id,
std::unique_ptr<EventFilter> event_filter) {
+ PERFETTO_CHECK(config_id);
config_id_ = config_id;
event_filter_ = std::move(event_filter);
+}
+
+void FtraceDataSource::Start() {
+ FtraceController* ftrace = controller_weak_.get();
+ if (!ftrace)
+ return;
+ PERFETTO_CHECK(config_id_); // Must be initialized at this point.
+ if (!ftrace->StartDataSource(this))
+ return;
DumpFtraceStats(&stats_before_);
}
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index 3def5fd..e576a5a 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -62,6 +62,9 @@
// source, to inject ftrace dependencies.
void Initialize(FtraceConfigId, std::unique_ptr<EventFilter>);
+ // ProbesDataSource implementation.
+ void Start() override;
+
// Flushes the ftrace buffers into the userspace trace buffers and writes
// also ftrace stats.
void Flush() override;
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 3e823e0..13d47e1 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -28,10 +28,12 @@
ProbesDataSource(TracingSessionID, int type_id);
virtual ~ProbesDataSource();
+ virtual void Start() = 0;
virtual void Flush() = 0;
const TracingSessionID tracing_session_id;
const int type_id;
+ bool started = false; // Set by probes_producer.cc.
private:
ProbesDataSource(const ProbesDataSource&) = delete;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 5b37165..d91f066 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -129,8 +129,10 @@
ConnectWithRetries(socket_name, task_runner);
}
-void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+void ProbesProducer::SetupDataSource(DataSourceInstanceID instance_id,
const DataSourceConfig& config) {
+ PERFETTO_DLOG("SetupDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+ config.name().c_str());
PERFETTO_DCHECK(data_sources_.count(instance_id) == 0);
TracingSessionID session_id = config.tracing_session_id();
PERFETTO_CHECK(session_id > 0);
@@ -153,12 +155,28 @@
session_data_sources_.emplace(session_id, data_source.get());
data_sources_[instance_id] = std::move(data_source);
+}
+void ProbesProducer::StartDataSource(DataSourceInstanceID instance_id,
+ const DataSourceConfig& config) {
+ PERFETTO_DLOG("StartDataSource(id=%" PRIu64 ", name=%s)", instance_id,
+ config.name().c_str());
+ auto it = data_sources_.find(instance_id);
+ if (it == data_sources_.end()) {
+ // Can happen if SetupDataSource() failed (e.g. ftrace was busy).
+ PERFETTO_ELOG("Data source id=%" PRIu64 " not found", instance_id);
+ return;
+ }
+ ProbesDataSource* data_source = it->second.get();
+ if (data_source->started)
+ return;
if (config.trace_duration_ms() != 0) {
uint32_t timeout = 5000 + 2 * config.trace_duration_ms();
watchdogs_.emplace(
instance_id, base::Watchdog::GetInstance()->CreateFatalTimer(timeout));
}
+ data_source->started = true;
+ data_source->Start();
}
std::unique_ptr<ProbesDataSource> ProbesProducer::CreateFtraceDataSource(
@@ -185,7 +203,7 @@
ftrace_->ClearTrace();
}
- PERFETTO_LOG("Ftrace start (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
+ PERFETTO_LOG("Ftrace setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")", id,
config.target_buffer());
const BufferID buffer_id = static_cast<BufferID>(config.target_buffer());
std::unique_ptr<FtraceDataSource> data_source(new FtraceDataSource(
@@ -193,7 +211,7 @@
endpoint_->CreateTraceWriter(buffer_id)));
if (!ftrace_->AddDataSource(data_source.get())) {
PERFETTO_ELOG(
- "Failed to start tracing (too many concurrent sessions or ftrace is "
+ "Failed to setup tracing (too many concurrent sessions or ftrace is "
"already in use)");
return nullptr;
}
@@ -204,7 +222,7 @@
TracingSessionID session_id,
DataSourceInstanceID id,
DataSourceConfig source_config) {
- PERFETTO_LOG("Inode file map start (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
+ PERFETTO_LOG("Inode file map setup (id=%" PRIu64 ", target_buf=%" PRIu32 ")",
id, source_config.target_buffer());
auto buffer_id = static_cast<BufferID>(source_config.target_buffer());
if (system_inodes_.empty())
@@ -220,13 +238,8 @@
const DataSourceConfig& config) {
base::ignore_result(id);
auto buffer_id = static_cast<BufferID>(config.target_buffer());
- auto data_source =
- std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
- session_id, endpoint_->CreateTraceWriter(buffer_id), config));
- if (config.process_stats_config().scan_all_processes_on_start()) {
- data_source->WriteAllProcesses();
- }
- return std::move(data_source);
+ return std::unique_ptr<ProcessStatsDataSource>(new ProcessStatsDataSource(
+ session_id, endpoint_->CreateTraceWriter(buffer_id), config));
}
std::unique_ptr<SysStatsDataSource> ProbesProducer::CreateSysStatsDataSource(
@@ -235,16 +248,16 @@
const DataSourceConfig& config) {
base::ignore_result(id);
auto buffer_id = static_cast<BufferID>(config.target_buffer());
- auto data_source = std::unique_ptr<SysStatsDataSource>(
+ return std::unique_ptr<SysStatsDataSource>(
new SysStatsDataSource(task_runner_, session_id,
endpoint_->CreateTraceWriter(buffer_id), config));
- return data_source;
}
void ProbesProducer::StopDataSource(DataSourceInstanceID id) {
PERFETTO_LOG("Producer stop (id=%" PRIu64 ")", id);
auto it = data_sources_.find(id);
if (it == data_sources_.end()) {
+ // Can happen if SetupDataSource() failed (e.g. ftrace was busy).
PERFETTO_ELOG("Cannot stop data source id=%" PRIu64 ", not found", id);
return;
}
@@ -268,7 +281,7 @@
size_t num_data_sources) {
for (size_t i = 0; i < num_data_sources; i++) {
auto it = data_sources_.find(data_source_ids[i]);
- if (it == data_sources_.end())
+ if (it == data_sources_.end() || !it->second->started)
continue;
it->second->Flush();
}
@@ -288,7 +301,7 @@
// unordered_multimap guarantees that entries with the same key are contiguous
// in the iteration.
for (auto it = session_data_sources_.begin(); /* check below*/; it++) {
- // If this is the last iteration or this is the session id has changed,
+ // If this is the last iteration or the session id has changed,
// dispatch the metadata update to the linked data sources, if any.
if (it == session_data_sources_.end() || it->first != last_session_id) {
bool has_inodes = metadata && !metadata->inode_and_device.empty();
@@ -307,6 +320,8 @@
last_session_id = it->first;
}
ProbesDataSource* ds = it->second;
+ if (!ds->started)
+ continue;
switch (ds->type_id) {
case FtraceDataSource::kTypeId:
metadata = static_cast<FtraceDataSource*>(ds)->mutable_metadata();
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index e7b9403..14737d6 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -49,6 +49,7 @@
// Producer Impl:
void OnConnect() override;
void OnDisconnect() override;
+ void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
void StopDataSource(DataSourceInstanceID) override;
void OnTracingSetup() override;
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index 80d125b..160484f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -75,10 +75,11 @@
const DataSourceConfig& config)
: ProbesDataSource(session_id, kTypeId),
writer_(std::move(writer)),
- config_(config),
record_thread_names_(config.process_stats_config().record_thread_names()),
+ dump_all_procs_on_start_(
+ config.process_stats_config().scan_all_processes_on_start()),
weak_factory_(this) {
- const auto& quirks = config_.process_stats_config().quirks();
+ const auto& quirks = config.process_stats_config().quirks();
enable_on_demand_dumps_ =
(std::find(quirks.begin(), quirks.end(),
ProcessStatsConfig::DISABLE_ON_DEMAND) == quirks.end());
@@ -86,6 +87,11 @@
ProcessStatsDataSource::~ProcessStatsDataSource() = default;
+void ProcessStatsDataSource::Start() {
+ if (dump_all_procs_on_start_)
+ WriteAllProcesses();
+}
+
base::WeakPtr<ProcessStatsDataSource> ProcessStatsDataSource::GetWeakPtr()
const {
return weak_factory_.GetWeakPtr();
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 866d2cf..57f0324 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -44,16 +44,17 @@
const DataSourceConfig&);
~ProcessStatsDataSource() override;
- const DataSourceConfig& config() const { return config_; }
-
base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
void WriteAllProcesses();
void OnPids(const std::vector<int32_t>& pids);
- void Flush() override;
// Virtual for testing.
virtual std::string ReadProcPidFile(int32_t pid, const std::string& file);
+ // ProbesDataSource implementation.
+ void Start() override;
+ void Flush() override;
+
private:
ProcessStatsDataSource(const ProcessStatsDataSource&) = delete;
ProcessStatsDataSource& operator=(const ProcessStatsDataSource&) = delete;
@@ -67,11 +68,11 @@
void FinalizeCurPsTree();
std::unique_ptr<TraceWriter> writer_;
- const DataSourceConfig config_;
TraceWriter::TracePacketHandle cur_packet_;
protos::pbzero::ProcessTree* cur_ps_tree_ = nullptr;
bool record_thread_names_ = false;
bool enable_on_demand_dumps_ = true;
+ bool dump_all_procs_on_start_ = false;
// This set contains PIDs as per the Linux kernel notion of a PID (which is
// really a TID). In practice this set will contain all TIDs for all processes
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index ebc3733..cd6efd1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -125,6 +125,9 @@
meminfo_ticks_ = ticks[0];
vmstat_ticks_ = ticks[1];
stat_ticks_ = ticks[2];
+}
+
+void SysStatsDataSource::Start() {
auto weak_this = GetWeakPtr();
task_runner_->PostTask(std::bind(&SysStatsDataSource::Tick, weak_this));
}
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index a5ea4f7..6081374 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -55,6 +55,7 @@
~SysStatsDataSource() override;
// ProbesDataSource implementation.
+ void Start() override;
void Flush() override;
base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
index e2c18d9..763d8d1 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source_unittest.cc
@@ -209,6 +209,7 @@
auto instance = std::unique_ptr<SysStatsDataSource>(new SysStatsDataSource(
&task_runner_, 0, std::move(writer), cfg, MockOpenReadOnly));
instance->set_ns_per_user_hz_for_testing(1000000000ull / 100); // 100 Hz.
+ instance->Start();
return instance;
}
diff --git a/src/tracing/core/patch_list.h b/src/tracing/core/patch_list.h
index 5cef17d..bdfd028 100644
--- a/src/tracing/core/patch_list.h
+++ b/src/tracing/core/patch_list.h
@@ -53,7 +53,7 @@
}
private:
- Patch& operator=(const Patch&) = default;
+ Patch& operator=(const Patch&) = delete;
Patch(Patch&&) noexcept = delete;
Patch& operator=(Patch&&) = delete;
};
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 8787672..5a0ca16 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -154,8 +154,13 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
+ // Calling StartTracing() should be a noop (% a DLOG statement) because the
+ // trace config didn't have the |deferred_start| flag set.
+ consumer->StartTracing();
+
consumer->DisableTracing();
producer->WaitForDataSourceStop("data_source");
consumer->WaitForTracingDisabled();
@@ -178,6 +183,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<MockProducer> producer_otheruid = CreateMockProducer();
@@ -195,6 +201,7 @@
trace_config.set_lockdown_mode(
TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR);
consumer->EnableTracing(trace_config);
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<MockProducer> producer_otheruid2 = CreateMockProducer();
@@ -220,6 +227,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
// Disconnecting the consumer while tracing should trigger data source
@@ -243,6 +251,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
// Disconnecting and reconnecting a producer with a matching data source.
@@ -252,6 +261,7 @@
producer->Connect(svc.get(), "mock_producer_2");
producer->RegisterDataSource("data_source");
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
}
@@ -303,6 +313,7 @@
consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
static const char kPayload[] = "1234567890abcdef-";
@@ -397,12 +408,15 @@
size_t actual_page_sizes_kb[kNumProducers]{};
for (size_t i = 0; i < kNumProducers; i++) {
producer[i]->WaitForTracingSetup();
- producer[i]->WaitForDataSourceStart("data_source");
+ producer[i]->WaitForDataSourceSetup("data_source");
actual_shm_sizes_kb[i] =
producer[i]->endpoint()->shared_memory()->size() / 1024;
actual_page_sizes_kb[i] =
producer[i]->endpoint()->shared_buffer_page_size_kb();
}
+ for (size_t i = 0; i < kNumProducers; i++) {
+ producer[i]->WaitForDataSourceStart("data_source");
+ }
ASSERT_THAT(actual_page_sizes_kb, ElementsAreArray(kExpectedPageSizesKb));
ASSERT_THAT(actual_shm_sizes_kb, ElementsAreArray(kExpectedSizesKb));
}
@@ -422,6 +436,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<TraceWriter> writer =
@@ -460,6 +475,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<TraceWriter> writer =
@@ -498,6 +514,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<TraceWriter> writer =
@@ -564,6 +581,11 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+
+ producer->WaitForDataSourceSetup("ds_will_ack_1");
+ producer->WaitForDataSourceSetup("ds_wont_ack");
+ producer->WaitForDataSourceSetup("ds_will_ack_2");
+
producer->WaitForDataSourceStart("ds_will_ack_1");
producer->WaitForDataSourceStart("ds_wont_ack");
producer->WaitForDataSourceStart("ds_will_ack_2");
@@ -636,6 +658,7 @@
consumer->EnableTracing(trace_config);
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
std::unique_ptr<TraceWriter> writer =
@@ -675,11 +698,15 @@
if (i == 0)
producer1->WaitForTracingSetup();
- producer1->WaitForDataSourceStart("ds_1A");
- producer1->WaitForDataSourceStart("ds_1B");
+ producer1->WaitForDataSourceSetup("ds_1A");
+ producer1->WaitForDataSourceSetup("ds_1B");
if (i == 0)
producer2->WaitForTracingSetup();
+ producer2->WaitForDataSourceSetup("ds_2A");
+
+ producer1->WaitForDataSourceStart("ds_1A");
+ producer1->WaitForDataSourceStart("ds_1B");
producer2->WaitForDataSourceStart("ds_2A");
auto* ds1 = producer1->GetDataSourceInstance("ds_1A");
@@ -723,6 +750,7 @@
base::TempFile tmp_file = base::TempFile::Create();
consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
producer->WaitForTracingSetup();
+ producer->WaitForDataSourceSetup("data_source");
producer->WaitForDataSourceStart("data_source");
// Write some variable length payload, waiting for sync markers every now
@@ -777,4 +805,42 @@
EXPECT_EQ(whole_trace.SerializeAsString(), merged_trace.SerializeAsString());
}
+// Creates a tracing session with |deferred_start| and checks that data sources
+// are started only after calling StartTracing().
+TEST_F(TracingServiceImplTest, DeferredStart) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+
+ // Create two data sources but enable only one of them.
+ producer->RegisterDataSource("ds_1");
+ producer->RegisterDataSource("ds_2");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ trace_config.add_data_sources()->mutable_config()->set_name("ds_1");
+ trace_config.set_deferred_start(true);
+ trace_config.set_duration_ms(1);
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+
+ producer->WaitForDataSourceSetup("ds_1");
+
+ // Make sure we don't get unexpected DataSourceStart() notifications yet.
+ task_runner.RunUntilIdle();
+
+ consumer->StartTracing();
+
+ producer->WaitForDataSourceStart("ds_1");
+
+ auto writer1 = producer->CreateTraceWriter("ds_1");
+ producer->WaitForFlush(writer1.get());
+
+ producer->WaitForDataSourceStop("ds_1");
+ consumer->WaitForTracingDisabled();
+}
+
} // namespace perfetto
diff --git a/src/tracing/core/trace_config.cc b/src/tracing/core/trace_config.cc
index b4d0081..a5cf0bb 100644
--- a/src/tracing/core/trace_config.cc
+++ b/src/tracing/core/trace_config.cc
@@ -92,6 +92,11 @@
static_cast<decltype(max_file_size_bytes_)>(proto.max_file_size_bytes());
guardrail_overrides_.FromProto(proto.guardrail_overrides());
+
+ static_assert(sizeof(deferred_start_) == sizeof(proto.deferred_start()),
+ "size mismatch");
+ deferred_start_ =
+ static_cast<decltype(deferred_start_)>(proto.deferred_start());
unknown_fields_ = proto.unknown_fields();
}
@@ -152,6 +157,11 @@
max_file_size_bytes_));
guardrail_overrides_.ToProto(proto->mutable_guardrail_overrides());
+
+ static_assert(sizeof(deferred_start_) == sizeof(proto->deferred_start()),
+ "size mismatch");
+ proto->set_deferred_start(
+ static_cast<decltype(proto->deferred_start())>(deferred_start_));
*(proto->mutable_unknown_fields()) = unknown_fields_;
}
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 2483235..9cab744 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -238,6 +238,11 @@
}
if (cfg.enable_extra_guardrails()) {
+ if (cfg.deferred_start()) {
+ PERFETTO_ELOG(
+ "deferred_start=true is not supported in unsupervised traces");
+ return false;
+ }
if (cfg.duration_ms() > kMaxTracingDurationMillis) {
PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms > %" PRIu64
" ms)",
@@ -340,7 +345,7 @@
consumer->tracing_session_id_ = tsid;
- // Enable the data sources on the producers.
+ // Setup the data sources on the producers without starting them.
for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
// Scan all the registered data sources with a matching name.
auto range = data_sources_.equal_range(cfg_data_source.config().name());
@@ -353,13 +358,45 @@
break;
}
}
- StartDataSource(cfg_data_source, producer_config, it->second,
+ SetupDataSource(cfg_data_source, producer_config, it->second,
tracing_session);
}
}
+ tracing_session->pending_stop_acks.clear();
+ tracing_session->state = TracingSession::CONFIGURED;
+ PERFETTO_LOG(
+ "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
+ "buffer size:%zu KB, total sessions:%zu",
+ cfg.data_sources().size(), cfg.duration_ms(), cfg.buffers_size(),
+ total_buf_size_kb, tracing_sessions_.size());
+
+ // Start the data sources, unless this is a case of early setup + fast
+ // triggering, using TraceConfig.deferred_start.
+ if (!cfg.deferred_start())
+ return StartTracing(tsid);
+
+ return true;
+}
+
+bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ TracingSession* tracing_session = GetTracingSession(tsid);
+ if (!tracing_session) {
+ PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
+ return false;
+ }
+
+ if (tracing_session->state != TracingSession::CONFIGURED) {
+ PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
+ tracing_session->state);
+ return false;
+ }
+
+ tracing_session->state = TracingSession::STARTED;
+
// Trigger delayed task if the trace is time limited.
- const uint32_t trace_duration_ms = cfg.duration_ms();
+ const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
if (trace_duration_ms > 0) {
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostDelayedTask(
@@ -371,7 +408,7 @@
}
// Start the periodic drain tasks if we should to save the trace into a file.
- if (cfg.write_into_file()) {
+ if (tracing_session->config.write_into_file()) {
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostDelayedTask(
[weak_this, tsid] {
@@ -381,13 +418,16 @@
tracing_session->delay_to_next_write_period_ms());
}
- tracing_session->pending_stop_acks.clear();
- tracing_session->state = TracingSession::ENABLED;
- PERFETTO_LOG(
- "Enabled tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
- "buffer size:%zu KB, total sessions:%zu",
- cfg.data_sources().size(), trace_duration_ms, cfg.buffers_size(),
- total_buf_size_kb, tracing_sessions_.size());
+ for (const auto& kv : tracing_session->data_source_instances) {
+ ProducerID producer_id = kv.first;
+ const DataSourceInstance& data_source = kv.second;
+ ProducerEndpointImpl* producer = GetProducer(producer_id);
+ if (!producer) {
+ PERFETTO_DCHECK(false);
+ continue;
+ }
+ producer->StartDataSource(data_source.instance_id, data_source.config);
+ }
return true;
}
@@ -427,8 +467,15 @@
DisableTracingNotifyConsumerAndFlushFile(tracing_session);
return;
+ // Continues below.
+ case TracingSession::CONFIGURED:
+ // If the session didn't even start there is no need to orchestrate a
+ // graceful stop of data sources.
+ disable_immediately = true;
+ break;
+
// This is the nominal case, continues below.
- case TracingSession::ENABLED:
+ case TracingSession::STARTED:
break;
}
@@ -441,7 +488,7 @@
tracing_session->pending_stop_acks.insert(
std::make_pair(producer_id, ds_inst_id));
}
- producer->TearDownDataSource(ds_inst_id);
+ producer->StopDataSource(ds_inst_id);
}
tracing_session->data_source_instances.clear();
@@ -497,8 +544,9 @@
PERFETTO_DCHECK_THREAD(thread_checker_);
TracingSession* tracing_session = GetTracingSession(tsid);
if (!tracing_session ||
- tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS)
+ tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
return; // Tracing session was successfully disabled.
+ }
PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
tsid);
@@ -798,7 +846,7 @@
if (stop_writing_into_file) {
tracing_session->write_into_file.reset();
tracing_session->write_period_ms = 0;
- if (tracing_session->state == TracingSession::ENABLED)
+ if (tracing_session->state == TracingSession::STARTED)
DisableTracing(tsid);
return;
}
@@ -873,8 +921,10 @@
for (auto& iter : tracing_sessions_) {
TracingSession& tracing_session = iter.second;
- if (tracing_session.state != TracingSession::ENABLED)
+ if (tracing_session.state != TracingSession::STARTED &&
+ tracing_session.state != TracingSession::CONFIGURED) {
continue;
+ }
TraceConfig::ProducerConfig producer_config;
for (auto& config : tracing_session.config.producers()) {
@@ -885,9 +935,12 @@
}
for (const TraceConfig::DataSource& cfg_data_source :
tracing_session.config.data_sources()) {
- if (cfg_data_source.config().name() == desc.name())
- StartDataSource(cfg_data_source, producer_config, reg_ds->second,
- &tracing_session);
+ if (cfg_data_source.config().name() != desc.name())
+ continue;
+ DataSourceInstance* ds_inst = SetupDataSource(
+ cfg_data_source, producer_config, reg_ds->second, &tracing_session);
+ if (ds_inst && tracing_session.state == TracingSession::STARTED)
+ producer->StartDataSource(ds_inst->instance_id, ds_inst->config);
}
}
}
@@ -903,7 +956,7 @@
for (auto it = ds_instances.begin(); it != ds_instances.end();) {
if (it->first == producer_id && it->second.data_source_name == name) {
DataSourceInstanceID ds_inst_id = it->second.instance_id;
- producer->TearDownDataSource(ds_inst_id);
+ producer->StopDataSource(ds_inst_id);
it = ds_instances.erase(it);
} else {
++it;
@@ -926,7 +979,7 @@
PERFETTO_DCHECK(false);
}
-void TracingServiceImpl::StartDataSource(
+TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
const TraceConfig::DataSource& cfg_data_source,
const TraceConfig::ProducerConfig& producer_config,
const RegisteredDataSource& data_source,
@@ -938,7 +991,7 @@
// ftrace, we must not enable it in that case.
if (lockdown_mode_ && producer->uid_ != uid_) {
PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
- return;
+ return nullptr;
}
// TODO(primiano): Add tests for registration ordering
// (data sources vs consumers).
@@ -950,10 +1003,19 @@
PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
cfg_data_source.config().name().c_str(),
producer->name_.c_str());
- return;
+ return nullptr;
}
}
+ auto relative_buffer_id = cfg_data_source.config().target_buffer();
+ if (relative_buffer_id >= tracing_session->num_buffers()) {
+ PERFETTO_LOG(
+ "The TraceConfig for DataSource %s specified a target_buffer out of "
+ "bound (%d). Skipping it.",
+ cfg_data_source.config().name().c_str(), relative_buffer_id);
+ return nullptr;
+ }
+
// Create a copy of the DataSourceConfig specified in the trace config. This
// will be passed to the producer after translating the |target_buffer| id.
// The |target_buffer| parameter passed by the consumer in the trace config is
@@ -961,27 +1023,22 @@
// translated to the global BufferID before passing it to the producers, which
// don't know anything about tracing sessions and consumers.
- DataSourceConfig ds_config = cfg_data_source.config(); // Deliberate copy.
+ DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
+ auto insert_iter = tracing_session->data_source_instances.emplace(
+ producer->id_,
+ DataSourceInstance{inst_id,
+ cfg_data_source.config(), // Deliberate copy.
+ data_source.descriptor.name(),
+ data_source.descriptor.will_notify_on_stop()});
+ DataSourceInstance* ds_instance = &insert_iter->second;
+ DataSourceConfig& ds_config = ds_instance->config;
ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
ds_config.set_tracing_session_id(tracing_session->id);
- auto relative_buffer_id = ds_config.target_buffer();
- if (relative_buffer_id >= tracing_session->num_buffers()) {
- PERFETTO_LOG(
- "The TraceConfig for DataSource %s specified a target_buffer out of "
- "bound (%d). Skipping it.",
- ds_config.name().c_str(), relative_buffer_id);
- return;
- }
BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
PERFETTO_DCHECK(global_id);
ds_config.set_target_buffer(global_id);
- DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
- tracing_session->data_source_instances.emplace(
- producer->id_,
- DataSourceInstance{inst_id, data_source.descriptor.name(),
- data_source.descriptor.will_notify_on_stop()});
- PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16,
+ PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
ds_config.name().c_str(), global_id);
if (!producer->shared_memory()) {
// Determine the SMB page size. Must be an integer multiple of 4k.
@@ -1011,7 +1068,8 @@
producer->OnTracingSetup();
UpdateMemoryGuardrail();
}
- producer->StartDataSource(inst_id, ds_config);
+ producer->SetupDataSource(inst_id, ds_config);
+ return ds_instance;
}
// Note: all the fields % *_trusted ones are untrusted, as in, the Producer
@@ -1311,6 +1369,15 @@
NotifyOnTracingDisabled();
}
+void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ if (!tracing_session_id_) {
+ PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
+ return;
+ }
+ service_->StartTracing(tracing_session_id_);
+}
+
void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!tracing_session_id_) {
@@ -1473,7 +1540,7 @@
return shared_buffer_page_size_kb_;
}
-void TracingServiceImpl::ProducerEndpointImpl::TearDownDataSource(
+void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
DataSourceInstanceID ds_inst_id) {
// TODO(primiano): When we'll support tearing down the SMB, at this point we
// should send the Producer a TearDownTracing if all its data sources have
@@ -1524,6 +1591,17 @@
});
}
+void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
+ DataSourceInstanceID ds_id,
+ const DataSourceConfig& config) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ task_runner_->PostTask([weak_this, ds_id, config] {
+ if (weak_this)
+ weak_this->producer_->SetupDataSource(ds_id, std::move(config));
+ });
+}
+
void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
DataSourceInstanceID ds_id,
const DataSourceConfig& config) {
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index d50057e..9c325cd 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -83,8 +83,9 @@
size_t shared_buffer_page_size_kb() const override;
void OnTracingSetup();
+ void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
- void TearDownDataSource(DataSourceInstanceID);
+ void StopDataSource(DataSourceInstanceID);
void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
private:
@@ -122,6 +123,7 @@
// TracingService::ConsumerEndpoint implementation.
void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+ void StartTracing() override;
void DisableTracing() override;
void ReadBuffers() override;
void FreeBuffers() override;
@@ -167,6 +169,7 @@
bool EnableTracing(ConsumerEndpointImpl*,
const TraceConfig&,
base::ScopedFile);
+ bool StartTracing(TracingSessionID);
void DisableTracing(TracingSessionID, bool disable_immediately = false);
void Flush(TracingSessionID tsid,
uint32_t timeout_ms,
@@ -201,7 +204,12 @@
// Represents an active data source for a tracing session.
struct DataSourceInstance {
+ DataSourceInstance(const DataSourceInstance&) = delete;
+ DataSourceInstance& operator=(const DataSourceInstance&) = delete;
+ DataSourceInstance(DataSourceInstance&&) noexcept = default;
+
DataSourceInstanceID instance_id;
+ DataSourceConfig config;
std::string data_source_name;
bool will_notify_on_stop;
};
@@ -215,7 +223,12 @@
// Holds the state of a tracing session. A tracing session is uniquely bound
// a specific Consumer. Each Consumer can own one or more sessions.
struct TracingSession {
- enum State { DISABLED = 0, ENABLED, DISABLING_WAITING_STOP_ACKS };
+ enum State {
+ DISABLED = 0,
+ CONFIGURED,
+ STARTED,
+ DISABLING_WAITING_STOP_ACKS
+ };
TracingSession(TracingSessionID, ConsumerEndpointImpl*, const TraceConfig&);
@@ -276,10 +289,10 @@
TracingServiceImpl(const TracingServiceImpl&) = delete;
TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
- void StartDataSource(const TraceConfig::DataSource&,
- const TraceConfig::ProducerConfig&,
- const RegisteredDataSource&,
- TracingSession*);
+ DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
+ const TraceConfig::ProducerConfig&,
+ const RegisteredDataSource&,
+ TracingSession*);
// Returns the next available ProducerID that is not in |producers_|.
ProducerID GetNextProducerID();
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index 05d67fa..cb54e43 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -87,6 +87,22 @@
consumer_port_.EnableTracing(req, std::move(async_response), *fd);
}
+void ConsumerIPCClientImpl::StartTracing() {
+ if (!connected_) {
+ PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
+ return;
+ }
+
+ ipc::Deferred<protos::StartTracingResponse> async_response;
+ async_response.Bind(
+ [](ipc::AsyncResult<protos::StartTracingResponse> response) {
+ if (!response)
+ PERFETTO_DLOG("StartTracing() failed");
+ });
+ protos::StartTracingRequest req;
+ consumer_port_.StartTracing(req, std::move(async_response));
+}
+
void ConsumerIPCClientImpl::DisableTracing() {
if (!connected_) {
PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 4257a54..5ed9516 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -60,6 +60,7 @@
// These methods are invoked by the actual Consumer(s) code by clients of the
// tracing library, which know nothing about the IPC transport.
void EnableTracing(const TraceConfig&, base::ScopedFile) override;
+ void StartTracing() override;
void DisableTracing() override;
void ReadBuffers() override;
void FreeBuffers() override;
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index 2abdcf7..1d2f907 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -95,6 +95,7 @@
PERFETTO_DLOG("Tracing service connection failure");
connected_ = false;
producer_->OnDisconnect();
+ data_sources_setup_.clear();
}
void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
@@ -109,11 +110,29 @@
void ProducerIPCClientImpl::OnServiceRequest(
const protos::GetAsyncCommandResponse& cmd) {
PERFETTO_DCHECK_THREAD(thread_checker_);
+
+ // This message is sent only when connecting to a service running Android Q+.
+ // See comment below in kStartDataSource.
+ if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupDataSource) {
+ const auto& req = cmd.setup_data_source();
+ const DataSourceInstanceID dsid = req.new_instance_id();
+ DataSourceConfig cfg;
+ cfg.FromProto(req.config());
+ data_sources_setup_.insert(dsid);
+ producer_->SetupDataSource(dsid, cfg);
+ return;
+ }
+
if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
const auto& req = cmd.start_data_source();
const DataSourceInstanceID dsid = req.new_instance_id();
DataSourceConfig cfg;
cfg.FromProto(req.config());
+ if (!data_sources_setup_.count(dsid)) {
+ // When connecting with an older (Android P) service, the service will not
+ // send a SetupDataSource message. We synthesize it here in that case.
+ producer_->SetupDataSource(dsid, cfg);
+ }
producer_->StartDataSource(dsid, cfg);
return;
}
@@ -121,6 +140,7 @@
if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
producer_->StopDataSource(dsid);
+ data_sources_setup_.erase(dsid);
return;
}
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 743d632..ba749ad 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <set>
#include <vector>
#include "perfetto/base/thread_checker.h"
@@ -99,6 +100,7 @@
std::unique_ptr<PosixSharedMemory> shared_memory_;
std::unique_ptr<SharedMemoryArbiter> shared_memory_arbiter_;
size_t shared_buffer_page_size_kb_ = 0;
+ std::set<DataSourceInstanceID> data_sources_setup_;
bool connected_ = false;
std::string const name_;
PERFETTO_THREAD_CHECKER(thread_checker_)
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index d378c67..cb18004 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -71,6 +71,14 @@
}
// Called by the IPC layer.
+void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&,
+ DeferredStartTracingResponse resp) {
+ RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+ remote_consumer->service_endpoint->StartTracing();
+ resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create());
+}
+
+// Called by the IPC layer.
void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
DeferredDisableTracingResponse resp) {
GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index 3fdc5d6..acf7d2f 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -46,6 +46,8 @@
// ConsumerPort implementation (from .proto IPC definition).
void EnableTracing(const protos::EnableTracingRequest&,
DeferredEnableTracingResponse) override;
+ void StartTracing(const protos::StartTracingRequest&,
+ DeferredStartTracingResponse) override;
void DisableTracing(const protos::DisableTracingRequest&,
DeferredDisableTracingResponse) override;
void ReadBuffers(const protos::ReadBuffersRequest&,
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index e2c2ab9..95dcf67 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -211,6 +211,24 @@
// |service_endpoint| (in the RemoteProducer dtor).
void ProducerIPCService::RemoteProducer::OnDisconnect() {}
+// Invoked by the |core_service_| business logic when it wants to create a new
+// data source.
+void ProducerIPCService::RemoteProducer::SetupDataSource(
+ DataSourceInstanceID dsid,
+ const DataSourceConfig& cfg) {
+ if (!async_producer_commands.IsBound()) {
+ PERFETTO_DLOG(
+ "The Service tried to create a new data source but the remote Producer "
+ "has not yet initialized the connection");
+ return;
+ }
+ auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
+ cmd.set_has_more(true);
+ cmd->mutable_setup_data_source()->set_new_instance_id(dsid);
+ cfg.ToProto(cmd->mutable_setup_data_source()->mutable_config());
+ async_producer_commands.Resolve(std::move(cmd));
+}
+
// Invoked by the |core_service_| business logic when it wants to start a new
// data source.
void ProducerIPCService::RemoteProducer::StartDataSource(
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index 8446804..caa1978 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -71,6 +71,8 @@
// no connection here, these methods are posted straight away.
void OnConnect() override;
void OnDisconnect() override;
+ void SetupDataSource(DataSourceInstanceID,
+ const DataSourceConfig&) override;
void StartDataSource(DataSourceInstanceID,
const DataSourceConfig&) override;
void StopDataSource(DataSourceInstanceID) override;
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 31c7131..ff228c9 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -52,6 +52,10 @@
service_endpoint_->EnableTracing(trace_config, std::move(write_into_file));
}
+void MockConsumer::StartTracing() {
+ service_endpoint_->StartTracing();
+}
+
void MockConsumer::DisableTracing() {
service_endpoint_->DisableTracing();
}
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index 917aff8..bc1c6d4 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -48,6 +48,7 @@
void Connect(TracingService* svc);
void EnableTracing(const TraceConfig&, base::ScopedFile = base::ScopedFile());
+ void StartTracing();
void DisableTracing();
void FreeBuffers();
void WaitForTracingDisabled(uint32_t timeout_ms = 3000);
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 8c2f9c5..1294c1b 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -76,12 +76,12 @@
task_runner_->RunUntilCheckpoint(checkpoint_name);
}
-void MockProducer::WaitForDataSourceStart(const std::string& name) {
+void MockProducer::WaitForDataSourceSetup(const std::string& name) {
static int i = 0;
- auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+ auto checkpoint_name = "on_ds_setup_" + name + "_" + std::to_string(i++);
auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
EXPECT_CALL(*this,
- StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+ SetupDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
.WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
const DataSourceConfig& cfg) {
EXPECT_FALSE(data_source_instances_.count(cfg.name()));
@@ -95,6 +95,28 @@
task_runner_->RunUntilCheckpoint(checkpoint_name);
}
+void MockProducer::WaitForDataSourceStart(const std::string& name) {
+ static int i = 0;
+ auto checkpoint_name = "on_ds_start_" + name + "_" + std::to_string(i++);
+ auto on_ds_start = task_runner_->CreateCheckpoint(checkpoint_name);
+ EXPECT_CALL(*this,
+ StartDataSource(_, Property(&DataSourceConfig::name, Eq(name))))
+ .WillOnce(Invoke([on_ds_start, this](DataSourceInstanceID ds_id,
+ const DataSourceConfig& cfg) {
+ // The data source might have been seen already through
+ // WaitForDataSourceSetup().
+ if (data_source_instances_.count(cfg.name()) == 0) {
+ auto target_buffer = static_cast<BufferID>(cfg.target_buffer());
+ auto session_id =
+ static_cast<TracingSessionID>(cfg.tracing_session_id());
+ data_source_instances_.emplace(
+ cfg.name(), EnabledDataSource{ds_id, target_buffer, session_id});
+ }
+ on_ds_start();
+ }));
+ task_runner_->RunUntilCheckpoint(checkpoint_name);
+}
+
void MockProducer::WaitForDataSourceStop(const std::string& name) {
static int i = 0;
auto checkpoint_name = "on_ds_stop_" + name + "_" + std::to_string(i++);
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index 590a64e..be13084 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -50,6 +50,7 @@
void RegisterDataSource(const std::string& name, bool ack_stop = false);
void UnregisterDataSource(const std::string& name);
void WaitForTracingSetup();
+ void WaitForDataSourceSetup(const std::string& name);
void WaitForDataSourceStart(const std::string& name);
void WaitForDataSourceStop(const std::string& name);
DataSourceInstanceID GetDataSourceInstanceId(const std::string& name);
@@ -68,6 +69,8 @@
// Producer implementation.
MOCK_METHOD0(OnConnect, void());
MOCK_METHOD0(OnDisconnect, void());
+ MOCK_METHOD2(SetupDataSource,
+ void(DataSourceInstanceID, const DataSourceConfig&));
MOCK_METHOD2(StartDataSource,
void(DataSourceInstanceID, const DataSourceConfig&));
MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 26aa5cd..8060ab7 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -56,6 +56,8 @@
// Producer implementation.
MOCK_METHOD0(OnConnect, void());
MOCK_METHOD0(OnDisconnect, void());
+ MOCK_METHOD2(SetupDataSource,
+ void(DataSourceInstanceID, const DataSourceConfig&));
MOCK_METHOD2(StartDataSource,
void(DataSourceInstanceID, const DataSourceConfig&));
MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
@@ -187,10 +189,31 @@
auto on_create_ds_instance =
task_runner_->CreateCheckpoint("on_create_ds_instance");
EXPECT_CALL(producer_, OnTracingSetup());
+
+ // Store the arguments passed to SetupDataSource() and later check that they
+ // match the ones passed to StartDataSource().
+ DataSourceInstanceID setup_id;
+ perfetto::protos::DataSourceConfig setup_cfg_proto;
+ EXPECT_CALL(producer_, SetupDataSource(_, _))
+ .WillOnce(
+ Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
+ const DataSourceConfig& cfg) {
+
+ setup_id = id;
+ cfg.ToProto(&setup_cfg_proto);
+ }));
EXPECT_CALL(producer_, StartDataSource(_, _))
.WillOnce(
- Invoke([on_create_ds_instance, &ds_iid, &global_buf_id](
- DataSourceInstanceID id, const DataSourceConfig& cfg) {
+ Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
+ &setup_cfg_proto](DataSourceInstanceID id,
+ const DataSourceConfig& cfg) {
+ // id and config should match the ones passed to SetupDataSource.
+ ASSERT_EQ(id, setup_id);
+ perfetto::protos::DataSourceConfig cfg_proto;
+ cfg.ToProto(&cfg_proto);
+ ASSERT_EQ(cfg_proto.SerializeAsString(),
+ setup_cfg_proto.SerializeAsString());
+
ASSERT_NE(0u, id);
ds_iid = id;
ASSERT_EQ("perfetto.test", cfg.name());
@@ -309,6 +332,7 @@
auto on_create_ds_instance =
task_runner_->CreateCheckpoint("on_create_ds_instance");
EXPECT_CALL(producer_, OnTracingSetup());
+ EXPECT_CALL(producer_, SetupDataSource(_, _));
EXPECT_CALL(producer_, StartDataSource(_, _))
.WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
DataSourceInstanceID, const DataSourceConfig& cfg) {
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 7bd70b3..3c32409 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -76,6 +76,9 @@
void OnDisconnect() override {}
+ void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override {
+ }
+
void StartDataSource(DataSourceInstanceID,
const DataSourceConfig& source_config) override {
auto trace_writer = endpoint_->CreateTraceWriter(
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index ae8ea07..20c4049 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -57,6 +57,9 @@
FAIL() << "Producer unexpectedly disconnected from the service";
}
+void FakeProducer::SetupDataSource(DataSourceInstanceID,
+ const DataSourceConfig&) {}
+
void FakeProducer::StartDataSource(DataSourceInstanceID,
const DataSourceConfig& source_config) {
PERFETTO_DCHECK_THREAD(thread_checker_);
diff --git a/test/fake_producer.h b/test/fake_producer.h
index cc55640..baacb9f 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -46,6 +46,8 @@
// Producer implementation.
void OnConnect() override;
void OnDisconnect() override;
+ void SetupDataSource(DataSourceInstanceID,
+ const DataSourceConfig& source_config) override;
void StartDataSource(DataSourceInstanceID,
const DataSourceConfig& source_config) override;
void StopDataSource(DataSourceInstanceID) override;
diff --git a/ui/src/tracks/chrome_slices/frontend.ts b/ui/src/tracks/chrome_slices/frontend.ts
index 31f63bb..6f64494 100644
--- a/ui/src/tracks/chrome_slices/frontend.ts
+++ b/ui/src/tracks/chrome_slices/frontend.ts
@@ -90,6 +90,9 @@
timeScale.timeToPx(data.start),
timeScale.timeToPx(data.end), );
+ ctx.font = '12px Google Sans';
+ ctx.textAlign = 'center';
+
// measuretext is expensive so we only use it once.
const charWidth = ctx.measureText('abcdefghij').width / 10;
const pxEnd = timeScale.timeToPx(visibleWindowTime.end);
diff --git a/ui/src/tracks/cpu_slices/common.ts b/ui/src/tracks/cpu_slices/common.ts
index 38844f1..5f59072 100644
--- a/ui/src/tracks/cpu_slices/common.ts
+++ b/ui/src/tracks/cpu_slices/common.ts
@@ -14,7 +14,20 @@
export const CPU_SLICE_TRACK_KIND = 'CpuSliceTrack';
-export interface Data {
+export interface SummaryData {
+ kind: 'summary';
+
+ start: number;
+ end: number;
+ resolution: number;
+
+ bucketSizeSeconds: number;
+ utilizations: Float64Array;
+}
+
+export interface SliceData {
+ kind: 'slice';
+
start: number;
end: number;
resolution: number;
@@ -25,4 +38,7 @@
utids: Uint32Array;
}
+export type Data = SummaryData | SliceData;
+
+
export interface Config { cpu: number; }
diff --git a/ui/src/tracks/cpu_slices/controller.ts b/ui/src/tracks/cpu_slices/controller.ts
index 77fbdf0..a3f1098 100644
--- a/ui/src/tracks/cpu_slices/controller.ts
+++ b/ui/src/tracks/cpu_slices/controller.ts
@@ -18,7 +18,13 @@
trackControllerRegistry
} from '../../controller/track_controller';
-import {Config, CPU_SLICE_TRACK_KIND, Data} from './common';
+import {
+ Config,
+ CPU_SLICE_TRACK_KIND,
+ Data,
+ SliceData,
+ SummaryData
+} from './common';
class CpuSliceTrackController extends TrackController<Config, Data> {
static readonly kind = CPU_SLICE_TRACK_KIND;
@@ -36,7 +42,6 @@
const startNs = Math.round(start * 1e9);
const endNs = Math.round(end * 1e9);
- const resolutionNs = Math.round(resolution * 1e9);
this.busy = true;
if (this.setup === false) {
@@ -47,22 +52,81 @@
this.setup = true;
}
+ // |resolution| is in s/px (to nearest power of 10) asumming a display
+ // of ~1000px 0.001 is 1s.
+ const isQuantized = resolution >= 0.001;
+ // |resolution| is in s/px we want # ns for 10px window:
+ const bucketSizeNs = Math.round(resolution * 10 * 1e9);
+ let windowStartNs = startNs;
+ if (isQuantized) {
+ windowStartNs = Math.floor(windowStartNs / bucketSizeNs) * bucketSizeNs;
+ }
+ const windowDurNs = endNs - windowStartNs;
+
this.query(`update window_${this.trackState.id} set
- window_start=${startNs},
- window_dur=${endNs - startNs}
+ window_start=${windowStartNs},
+ window_dur=${windowDurNs},
+ quantum=${isQuantized ? bucketSizeNs : 0}
where rowid = 0;`);
- const LIMIT = 10000;
- const query = `select ts,dur,utid from span_${this.trackState.id}
+ if (isQuantized) {
+ this.publish(await this.computeSummary(
+ fromNs(windowStartNs), end, resolution, bucketSizeNs));
+ } else {
+ this.publish(
+ await this.computeSlices(fromNs(windowStartNs), end, resolution));
+ }
+ this.busy = false;
+ }
+
+ private async computeSummary(
+ start: number, end: number, resolution: number,
+ bucketSizeNs: number): Promise<SummaryData> {
+ const startNs = Math.round(start * 1e9);
+ const endNs = Math.round(end * 1e9);
+ const numBuckets = Math.ceil((endNs - startNs) / bucketSizeNs);
+
+ const query = `select
+ quantum_ts as bucket,
+ sum(dur)/cast(${bucketSizeNs} as float) as utilization
+ from span_${this.trackState.id}
where cpu = ${this.config.cpu}
and utid != 0
- and dur >= ${resolutionNs}
+ group by quantum_ts`;
+
+ const rawResult = await this.query(query);
+ const numRows = +rawResult.numRecords;
+
+ const summary: Data = {
+ kind: 'summary',
+ start,
+ end,
+ resolution,
+ bucketSizeSeconds: fromNs(bucketSizeNs),
+ utilizations: new Float64Array(numBuckets),
+ };
+ const cols = rawResult.columns;
+ for (let row = 0; row < numRows; row++) {
+ const bucket = +cols[0].longValues![row];
+ summary.utilizations[bucket] = +cols[1].doubleValues![row];
+ }
+ return summary;
+ }
+
+ private async computeSlices(start: number, end: number, resolution: number):
+ Promise<SliceData> {
+ // TODO(hjd): Remove LIMIT
+ const LIMIT = 10000;
+
+ const query = `select ts,dur,utid from span_${this.trackState.id}
+ where cpu = ${this.config.cpu}
+ and utid != 0
limit ${LIMIT};`;
const rawResult = await this.query(query);
const numRows = +rawResult.numRecords;
-
- const slices: Data = {
+ const slices: SliceData = {
+ kind: 'slice',
start,
end,
resolution,
@@ -81,13 +145,13 @@
if (numRows === LIMIT) {
slices.end = slices.ends[slices.ends.length - 1];
}
- this.publish(slices);
- this.busy = false;
+ return slices;
}
private async query(query: string) {
const result = await this.engine.query(query);
if (result.error) {
+ console.error(`Query error "${query}": ${result.error}`);
throw new Error(`Query error "${query}": ${result.error}`);
}
return result;
diff --git a/ui/src/tracks/cpu_slices/frontend.ts b/ui/src/tracks/cpu_slices/frontend.ts
index dc2913b..ec01b08 100644
--- a/ui/src/tracks/cpu_slices/frontend.ts
+++ b/ui/src/tracks/cpu_slices/frontend.ts
@@ -20,9 +20,16 @@
import {Track} from '../../frontend/track';
import {trackRegistry} from '../../frontend/track_registry';
-import {Config, CPU_SLICE_TRACK_KIND, Data} from './common';
+import {
+ Config,
+ CPU_SLICE_TRACK_KIND,
+ Data,
+ SliceData,
+ SummaryData
+} from './common';
-const MARGIN_TOP = 5;
+// 0.5 Makes the horizontal lines sharp.
+const MARGIN_TOP = 5.5;
const RECT_HEIGHT = 30;
function cropText(str: string, charWidth: number, rectWidth: number) {
@@ -56,9 +63,13 @@
private hoveredUtid = -1;
private mouseXpos?: number;
private reqPending = false;
+ private hue: number;
constructor(trackState: TrackState) {
super(trackState);
+ // TODO: this needs to be kept in sync with the hue generation algorithm
+ // of overview_timeline_panel.ts
+ this.hue = (128 + (32 * this.config.cpu)) % 256;
}
reqDataDeferred() {
@@ -73,7 +84,6 @@
renderCanvas(ctx: CanvasRenderingContext2D): void {
// TODO: fonts and colors should come from the CSS and not hardcoded here.
-
const {timeScale, visibleWindowTime} = globals.frontendLocalState;
const data = this.data();
@@ -82,20 +92,13 @@
const inRange = data !== undefined &&
(visibleWindowTime.start >= data.start &&
visibleWindowTime.end <= data.end);
- if (!inRange || data.resolution > getCurResolution()) {
+ if (!inRange || data.resolution !== getCurResolution()) {
if (!this.reqPending) {
this.reqPending = true;
setTimeout(() => this.reqDataDeferred(), 50);
}
- if (data === undefined) return; // Can't possibly draw anything.
}
- ctx.textAlign = 'center';
- ctx.font = '12px Google Sans';
- const charWidth = ctx.measureText('dbpqaouk').width / 8;
-
- // TODO: this needs to be kept in sync with the hue generation algorithm
- // of overview_timeline_panel.ts
- const hue = (128 + (32 * this.config.cpu)) % 256;
+ if (data === undefined) return; // Can't possibly draw anything.
// If the cached trace slices don't fully cover the visible time range,
// show a gray rectangle with a "Loading..." label.
@@ -104,10 +107,50 @@
timeScale.timeToPx(visibleWindowTime.start),
timeScale.timeToPx(visibleWindowTime.end),
timeScale.timeToPx(data.start),
- timeScale.timeToPx(data.end), );
+ timeScale.timeToPx(data.end));
+ if (data.kind === 'summary') {
+ this.renderSummary(ctx, data);
+ } else if (data.kind === 'slice') {
+ this.renderSlices(ctx, data);
+ }
+ }
+
+ renderSummary(ctx: CanvasRenderingContext2D, data: SummaryData): void {
+ const {timeScale, visibleWindowTime} = globals.frontendLocalState;
+ const startPx = Math.floor(timeScale.timeToPx(visibleWindowTime.start));
+ const bottomY = MARGIN_TOP + RECT_HEIGHT;
+
+ let lastX = startPx;
+ let lastY = bottomY;
+
+ ctx.fillStyle = `hsl(${this.hue}, 50%, 60%)`;
+ ctx.beginPath();
+ ctx.moveTo(lastX, lastY);
+ for (let i = 0; i < data.utilizations.length; i++) {
+ const utilization = data.utilizations[i];
+ const startTime = i * data.bucketSizeSeconds + data.start;
+
+ lastX = Math.floor(timeScale.timeToPx(startTime));
+
+ ctx.lineTo(lastX, lastY);
+ lastY = MARGIN_TOP + Math.round(RECT_HEIGHT * (1 - utilization));
+ ctx.lineTo(lastX, lastY);
+ }
+ ctx.lineTo(lastX, bottomY);
+ ctx.closePath();
+ ctx.fill();
+ }
+
+ renderSlices(ctx: CanvasRenderingContext2D, data: SliceData): void {
+ const {timeScale, visibleWindowTime} = globals.frontendLocalState;
assertTrue(data.starts.length === data.ends.length);
assertTrue(data.starts.length === data.utids.length);
+
+ ctx.textAlign = 'center';
+ ctx.font = '12px Google Sans';
+ const charWidth = ctx.measureText('dbpqaouk').width / 8;
+
for (let i = 0; i < data.starts.length; i++) {
const tStart = data.starts[i];
const tEnd = data.ends[i];
@@ -121,7 +164,7 @@
if (rectWidth < 0.1) continue;
const hovered = this.hoveredUtid === utid;
- ctx.fillStyle = `hsl(${hue}, 50%, ${hovered ? 25 : 60}%`;
+ ctx.fillStyle = `hsl(${this.hue}, 50%, ${hovered ? 25 : 60}%)`;
ctx.fillRect(rectStart, MARGIN_TOP, rectEnd - rectStart, RECT_HEIGHT);
// TODO: consider de-duplicating this code with the copied one from
@@ -171,7 +214,7 @@
onMouseMove({x, y}: {x: number, y: number}) {
const data = this.data();
this.mouseXpos = x;
- if (data === undefined) return;
+ if (data === undefined || data.kind === 'summary') return;
const {timeScale} = globals.frontendLocalState;
if (y < MARGIN_TOP || y > MARGIN_TOP + RECT_HEIGHT) {
this.hoveredUtid = -1;