Add initial Tracing Service implementation
The code still has lot of rough edges and TODOs, but
is in a good state to test integration with other system
components and cover CTS tests.
Bug: 70284518
Change-Id: Id6ac73426750cc4002d4e5bca65a66f4684c116b
diff --git a/Android.bp b/Android.bp
index 024a3e5..b301a5c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -448,6 +448,7 @@
"src/tracing/ipc/service/service_ipc_host_impl.cc",
"src/tracing/test/aligned_buffer_test.cc",
"src/tracing/test/test_shared_memory.cc",
+ "src/tracing/test/tracing_integration_test.cc",
"tools/sanitizers_unittests/sanitizers_unittest.cc",
],
shared_libs: [
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index 5650291..eb068d3 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -429,6 +429,12 @@
(kAllChunksComplete & ((1 << (num_chunks * kChunkShift)) - 1));
}
+ // It is safe to call this only from the Service after having observed, with
+ // acquire semantics, that the page has been properly partitioned.
+ uint16_t get_target_buffer(size_t page_idx) {
+ return page_header(page_idx)->target_buffer.load(std::memory_order_relaxed);
+ }
+
// For testing / debugging only.
std::string page_header_dbg(size_t page_idx) {
uint32_t x = page_header(page_idx)->layout.load(std::memory_order_relaxed);
diff --git a/protos/test_event.proto b/protos/test_event.proto
index bac6322..f43e4bd 100644
--- a/protos/test_event.proto
+++ b/protos/test_event.proto
@@ -19,4 +19,6 @@
package perfetto.protos;
-message TestEvent {}
+message TestEvent {
+ optional string str = 1;
+}
diff --git a/src/base/debug_crash_stack_trace.cc b/src/base/debug_crash_stack_trace.cc
index 95d2abf..a81fefc 100644
--- a/src/base/debug_crash_stack_trace.cc
+++ b/src/base/debug_crash_stack_trace.cc
@@ -115,7 +115,7 @@
PrintHex(reinterpret_cast<uintptr_t>(info->si_addr));
Print("\n\nBacktrace:\n");
- const size_t kMaxFrames = 32;
+ const size_t kMaxFrames = 64;
uintptr_t frames[kMaxFrames];
StackCrawlState unwind_state(frames, kMaxFrames);
_Unwind_Backtrace(&TraceStackFrame, &unwind_state);
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index e805494..0b0a8e6 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -100,6 +100,7 @@
"test/aligned_buffer_test.h",
"test/test_shared_memory.cc",
"test/test_shared_memory.h",
+ "test/tracing_integration_test.cc",
]
}
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 6785bdb..b0cc33c 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -17,20 +17,26 @@
#include "src/tracing/core/service_impl.h"
#include <inttypes.h>
+#include <string.h>
#include <algorithm>
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
+#include "perfetto/protozero/proto_utils.h"
#include "perfetto/tracing/core/consumer.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/producer.h"
#include "perfetto/tracing/core/shared_memory.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
namespace perfetto {
// TODO add ThreadChecker everywhere.
+using protozero::proto_utils::ParseVarInt;
+
namespace {
constexpr size_t kPageSize = 4096;
constexpr size_t kDefaultShmSize = kPageSize * 16; // 64 KB.
@@ -47,7 +53,9 @@
ServiceImpl::ServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,
base::TaskRunner* task_runner)
- : shm_factory_(std::move(shm_factory)), task_runner_(task_runner) {
+ : task_runner_(task_runner),
+ shm_factory_(std::move(shm_factory)),
+ buffer_ids_(kMaxTraceBuffers) {
PERFETTO_DCHECK(task_runner_);
}
@@ -105,22 +113,210 @@
consumers_.erase(consumer);
}
-void ServiceImpl::EnableTracing(ConsumerEndpointImpl*, const TraceConfig&) {
- PERFETTO_DLOG("not implemented yet");
+void ServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
+ const TraceConfig& cfg) {
+ if (tracing_sessions_.count(consumer)) {
+ PERFETTO_DLOG(
+ "A Consumer is trying to EnableTracing() but another tracing session "
+ "is already active");
+ // TODO(primiano): make this a bool and return failure to the IPC layer.
+ return;
+ }
+ TracingSession& ts =
+ tracing_sessions_.emplace(consumer, TracingSession{}).first->second;
+
+ // Initialize the log buffers.
+ bool did_allocate_all_buffers = true;
+ for (const TraceConfig::BufferConfig& buffer_cfg : cfg.buffers()) {
+ BufferID id = static_cast<BufferID>(buffer_ids_.Allocate());
+ if (!id) {
+ did_allocate_all_buffers = false;
+ break;
+ }
+ PERFETTO_DCHECK(ts.trace_buffers.count(id) == 0);
+ // TODO(primiano): make TraceBuffer::kBufferPageSize dynamic.
+ ts.trace_buffers.emplace(id, TraceBuffer(buffer_cfg.size_kb() * 1024u));
+ }
+
+ // This can happen if all the kMaxTraceBuffers slots are taken (i.e. we are
+ // not talking about OOM here, just creating > kMaxTraceBuffers entries). In
+ // this case, free all the previously allocated buffers and abort.
+ // TODO: add a test to cover this case, this is quite subtle.
+ if (!did_allocate_all_buffers) {
+ for (const auto& kv : ts.trace_buffers)
+ buffer_ids_.Free(kv.first);
+ ts.trace_buffers.clear();
+ return; // TODO(primiano): return failure condition?
+ }
+
+ // Enable the data sources on the producers.
+ for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
+ // Scan all the registered data sources with a matching name.
+ auto range = data_sources_.equal_range(cfg_data_source.config().name());
+ for (auto it = range.first; it != range.second; it++) {
+ const RegisteredDataSource& reg_data_source = it->second;
+ // TODO(primiano): match against |producer_name_filter|.
+
+ const ProducerID producer_id = reg_data_source.producer_id;
+ auto producer_iter = producers_.find(producer_id);
+ if (producer_iter == producers_.end()) {
+ PERFETTO_DCHECK(false); // Something in the unregistration is broken.
+ continue;
+ }
+ ProducerEndpointImpl* producer = producer_iter->second;
+ DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
+ ts.data_source_instances.emplace(producer_id, inst_id);
+ producer->producer()->CreateDataSourceInstance(inst_id,
+ cfg_data_source.config());
+ }
+ }
+} // namespace perfetto
+
+void ServiceImpl::DisableTracing(ConsumerEndpointImpl* consumer) {
+ auto it = tracing_sessions_.find(consumer);
+ if (it == tracing_sessions_.end()) {
+ PERFETTO_DLOG("No active tracing session found for the Consumer");
+ return;
+ }
+ TracingSession& tracing_session = it->second;
+ for (const auto& data_source_inst : tracing_session.data_source_instances) {
+ auto producer_it = producers_.find(data_source_inst.first);
+ if (producer_it == producers_.end())
+ continue; // This could legitimately happen if a Producer disconnects.
+ producer_it->second->producer()->TearDownDataSourceInstance(
+ data_source_inst.second);
+ }
+ tracing_session.data_source_instances.clear();
}
-void ServiceImpl::DisableTracing(ConsumerEndpointImpl*) {
- PERFETTO_DLOG("not implemented yet");
-}
+void ServiceImpl::ReadBuffers(ConsumerEndpointImpl* consumer) {
+ auto it = tracing_sessions_.find(consumer);
+ if (it == tracing_sessions_.end()) {
+ PERFETTO_DLOG(
+ "Consumer invoked ReadBuffers() but no tracing session is active");
+ return; // TODO(primiano): signal failure?
+ }
+ // TODO(primiano): Most of this code is temporary and we should find a better
+ // solution to bookkeep the log buffer (e.g., an allocator-like freelist)
+ // rather than leveraging the SharedMemoryABI (which is intended only for the
+ // Producer <> Service SMB and not for the TraceBuffer itself).
+ auto weak_consumer = consumer->GetWeakPtr();
+ for (auto& buf_it : it->second.trace_buffers) {
+ TraceBuffer& tbuf = buf_it.second;
+ SharedMemoryABI& abi = *tbuf.abi;
+ for (size_t i = 0; i < tbuf.num_pages(); i++) {
+ const size_t page_idx = (i + tbuf.cur_page) % tbuf.num_pages();
+ if (abi.is_page_free(page_idx))
+ continue;
+ uint32_t layout = abi.page_layout_dbg(page_idx);
+ size_t num_chunks = abi.GetNumChunksForLayout(layout);
+ for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+ if (abi.GetChunkState(page_idx, chunk_idx) ==
+ SharedMemoryABI::kChunkFree) {
+ continue;
+ }
+ auto chunk = abi.GetChunkUnchecked(page_idx, layout, chunk_idx);
+ uint16_t num_packets;
+ uint8_t flags;
+ std::tie(num_packets, flags) = chunk.GetPacketCountAndFlags();
+ const uint8_t* ptr = chunk.payload_begin();
-void ServiceImpl::ReadBuffers(ConsumerEndpointImpl*) {
- PERFETTO_DLOG("not implemented yet");
+ // shared_ptr is really a workardound for the fact that is not possible
+ // to std::move() move-only types in labmdas until C++17.
+ std::shared_ptr<std::vector<TracePacket>> packets(
+ new std::vector<TracePacket>());
+ packets->reserve(num_packets);
+
+ for (size_t pack_idx = 0; pack_idx < num_packets; pack_idx++) {
+ uint64_t pack_size = 0;
+ ptr = ParseVarInt(ptr, chunk.end(), &pack_size);
+ // TODO stitching, look at the flags.
+ bool skip = (pack_idx == 0 &&
+ flags & SharedMemoryABI::ChunkHeader::
+ kFirstPacketContinuesFromPrevChunk) ||
+ (pack_idx == num_packets - 1 &&
+ flags & SharedMemoryABI::ChunkHeader::
+ kLastPacketContinuesOnNextChunk);
+
+ PERFETTO_DLOG(" #%-3zu len:%" PRIu64 " skip: %d\n", pack_idx,
+ pack_size, skip);
+ if (ptr > chunk.end() - pack_size) {
+ PERFETTO_DLOG("out of bounds!\n");
+ break;
+ }
+ if (!skip) {
+ packets->emplace_back();
+ packets->back().AddChunk(Chunk(ptr, pack_size));
+ }
+ ptr += pack_size;
+ } // for(packet)
+ task_runner_->PostTask([weak_consumer, packets]() {
+ if (weak_consumer)
+ weak_consumer->consumer()->OnTraceData(*packets, true /*has_more*/);
+ });
+ } // for(chunk)
+ } // for(page_idx)
+ } // for(buffer_id)
+ task_runner_->PostTask([weak_consumer]() {
+ if (weak_consumer)
+ weak_consumer->consumer()->OnTraceData(std::vector<TracePacket>(),
+ false /*has_more*/);
+ });
}
void ServiceImpl::FreeBuffers(ConsumerEndpointImpl*) {
+ // TODO(primiano): implement here.
PERFETTO_DLOG("not implemented yet");
}
+void ServiceImpl::RegisterDataSource(ProducerID producer_id,
+ DataSourceID ds_id,
+ const DataSourceDescriptor& desc) {
+ PERFETTO_DCHECK(!desc.name().empty());
+ data_sources_.emplace(desc.name(),
+ RegisteredDataSource{producer_id, ds_id, desc});
+}
+
+void ServiceImpl::CopyProducerPageIntoLogBuffer(ProducerID producer_id,
+ BufferID target_buffer,
+ const uint8_t* src,
+ size_t size) {
+ // TODO right now the page_size in the SMB and the trace_buffers_ can
+ // mismatch. Remove the ability to decide the page size on the Producer.
+
+ // TODO(primiano): We should have a direct index to find the TargetBuffer and
+ // perform ACL checks without iterating through all the producers.
+ TraceBuffer* tbuf = nullptr;
+ for (auto& sessions_it : tracing_sessions_) {
+ for (auto& tbuf_it : sessions_it.second.trace_buffers) {
+ const BufferID id = tbuf_it.first;
+ if (id == target_buffer) {
+ // TODO(primiano): we should have some stronger check to prevent that
+ // the Producer passes |target_buffer| which is valid, but that we never
+ // asked it to use. Essentially we want to prevent a malicious producer
+ // to inject data into a log buffer that has nothing to do with it.
+ tbuf = &tbuf_it.second;
+ break;
+ }
+ }
+ }
+
+ if (!tbuf) {
+ PERFETTO_DLOG("Could not find target buffer %u for producer %" PRIu64,
+ target_buffer, producer_id);
+ return;
+ }
+
+ PERFETTO_DCHECK(size == TraceBuffer::kBufferPageSize);
+ uint8_t* dst = tbuf->get_next_page();
+
+ // TODO(primiano): use sendfile(). Requires to make the tbuf itself
+ // a file descriptor (just use SharedMemory without sharing it).
+ PERFETTO_DLOG("Copying page %p from producer %" PRIu64,
+ reinterpret_cast<const void*>(src), producer_id);
+ memcpy(dst, src, size);
+}
+
////////////////////////////////////////////////////////////////////////////////
// ServiceImpl::ConsumerEndpointImpl implementation
////////////////////////////////////////////////////////////////////////////////
@@ -170,7 +366,14 @@
service_(service),
task_runner_(task_runner),
producer_(std::move(producer)),
- shared_memory_(std::move(shared_memory)) {}
+ shared_memory_(std::move(shared_memory)),
+ shmem_abi_(reinterpret_cast<uint8_t*>(shared_memory_->start()),
+ shared_memory_->size(),
+ kPageSize) {
+ // TODO(primiano): make the page-size for the SHM dynamic and find a way to
+ // communicate that to the Producer (add a field to the
+ // InitializeConnectionResponse IPC).
+}
ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
producer_->OnDisconnect();
@@ -178,23 +381,48 @@
}
void ServiceImpl::ProducerEndpointImpl::RegisterDataSource(
- const DataSourceDescriptor&,
+ const DataSourceDescriptor& desc,
RegisterDataSourceCallback callback) {
- const DataSourceID dsid = ++last_data_source_id_;
- task_runner_->PostTask(std::bind(std::move(callback), dsid));
- // TODO implement the bookkeeping logic.
+ DataSourceID ds_id = ++last_data_source_id_;
+ if (!desc.name().empty()) {
+ service_->RegisterDataSource(id_, ds_id, desc);
+ } else {
+ PERFETTO_DLOG("Received RegisterDataSource() with empty name");
+ ds_id = 0;
+ }
+ task_runner_->PostTask(std::bind(std::move(callback), ds_id));
}
void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
DataSourceID dsid) {
PERFETTO_CHECK(dsid);
- // TODO implement the bookkeeping logic.
+ // TODO(primiano): implement the bookkeeping logic.
}
void ServiceImpl::ProducerEndpointImpl::NotifySharedMemoryUpdate(
const std::vector<uint32_t>& changed_pages) {
- // TODO implement the bookkeeping logic.
- return;
+ for (uint32_t page_idx : changed_pages) {
+ if (page_idx >= shmem_abi_.num_pages())
+ continue; // Very likely a malicious producer playing dirty.
+
+ if (!shmem_abi_.is_page_complete(page_idx))
+ continue;
+ if (!shmem_abi_.TryAcquireAllChunksForReading(page_idx))
+ continue;
+
+ // TODO: we should start collecting individual chunks from non fully
+ // complete pages after a while.
+
+ service_->CopyProducerPageIntoLogBuffer(
+ id_, shmem_abi_.get_target_buffer(page_idx),
+ shmem_abi_.page_start(page_idx), shmem_abi_.page_size());
+
+ shmem_abi_.ReleaseAllChunksAsFree(page_idx);
+ }
+}
+
+SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
+ return shared_memory_.get();
}
std::unique_ptr<TraceWriter>
@@ -206,8 +434,29 @@
PERFETTO_CHECK(false);
}
-SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
- return shared_memory_.get();
+////////////////////////////////////////////////////////////////////////////////
+// ServiceImpl::TraceBuffer implementation
+////////////////////////////////////////////////////////////////////////////////
+
+ServiceImpl::TraceBuffer::TraceBuffer(size_t sz) : size(sz) {
+ void* ptr = nullptr;
+ PERFETTO_CHECK(size % kBufferPageSize == 0);
+ // TODO(primiano): introduce base::PageAllocator and use mmap() instead.
+ if (posix_memalign(&ptr, kPageSize, size)) {
+ PERFETTO_ELOG("Trace buffer allocation failed (size: %zu, page_size: %zu)",
+ size, kBufferPageSize);
+ return;
+ }
+ PERFETTO_CHECK(ptr);
+ memset(ptr, 0, size);
+ data.reset(ptr);
+ abi.reset(new SharedMemoryABI(get_page(0), size, kBufferPageSize));
}
+ServiceImpl::TraceBuffer::~TraceBuffer() = default;
+ServiceImpl::TraceBuffer::TraceBuffer(ServiceImpl::TraceBuffer&&) noexcept =
+ default;
+ServiceImpl::TraceBuffer& ServiceImpl::TraceBuffer::operator=(
+ ServiceImpl::TraceBuffer&&) = default;
+
} // namespace perfetto
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 91db0fb..52b1277 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -22,9 +22,13 @@
#include <memory>
#include <set>
+#include "perfetto/base/utils.h"
#include "perfetto/base/weak_ptr.h"
#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
#include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/id_allocator.h"
namespace perfetto {
@@ -71,6 +75,7 @@
base::TaskRunner* const task_runner_;
Producer* producer_;
std::unique_ptr<SharedMemory> shared_memory_;
+ SharedMemoryABI shmem_abi_;
DataSourceID last_data_source_id_ = 0;
};
@@ -104,6 +109,13 @@
// Called by ProducerEndpointImpl.
void DisconnectProducer(ProducerID);
+ void RegisterDataSource(ProducerID,
+ DataSourceID,
+ const DataSourceDescriptor&);
+ void CopyProducerPageIntoLogBuffer(ProducerID,
+ BufferID,
+ const uint8_t*,
+ size_t);
// Called by ConsumerEndpointImpl.
void DisconnectConsumer(ConsumerEndpointImpl*);
@@ -125,14 +137,76 @@
ProducerEndpointImpl* GetProducer(ProducerID) const;
private:
+ struct RegisteredDataSource {
+ ProducerID producer_id;
+ DataSourceID data_source_id;
+ DataSourceDescriptor descriptor;
+ };
+
+ struct TraceBuffer {
+ // TODO(primiano): make this configurable.
+ static constexpr size_t kBufferPageSize = 4096;
+ explicit TraceBuffer(size_t size);
+ ~TraceBuffer();
+ TraceBuffer(TraceBuffer&&) noexcept;
+ TraceBuffer& operator=(TraceBuffer&&);
+
+ size_t num_pages() const { return size / kBufferPageSize; }
+
+ uint8_t* get_page(size_t page) {
+ PERFETTO_DCHECK(page < num_pages());
+ return reinterpret_cast<uint8_t*>(data.get()) + page * kBufferPageSize;
+ }
+
+ uint8_t* get_next_page() {
+ size_t cur = cur_page;
+ cur_page = cur_page == num_pages() - 1 ? 0 : cur_page + 1;
+ return get_page(cur);
+ }
+
+ size_t size;
+ size_t cur_page = 0; // Write pointer in the ring buffer.
+ std::unique_ptr<void, base::FreeDeleter> data;
+
+ // TODO(primiano): The TraceBuffer is not shared and there is no reason to
+ // use the SharedMemoryABI. This is just a a temporary workaround to reuse
+ // the convenience of SharedMemoryABI for bookkeeping of the buffer when
+ // implementing ReadBuffers().
+ std::unique_ptr<SharedMemoryABI> abi;
+ };
+
+ // Holds the state of a tracing session. A tracing session is uniquely bound
+ // a specific Consumer. Each Consumer can own one or more sessions.
+ struct TracingSession {
+ // List of data source instances that have been enabled on the various
+ // producers for this tracing session.
+ std::multimap<ProducerID, DataSourceInstanceID> data_source_instances;
+
+ // The key of this map matches the |target_buffer| in the
+ // SharedMemoryABI::ChunkHeader.
+ std::map<BufferID, TraceBuffer> trace_buffers;
+ };
+
ServiceImpl(const ServiceImpl&) = delete;
ServiceImpl& operator=(const ServiceImpl&) = delete;
- std::unique_ptr<SharedMemory::Factory> shm_factory_;
base::TaskRunner* const task_runner_;
+ std::unique_ptr<SharedMemory::Factory> shm_factory_;
ProducerID last_producer_id_ = 0;
+ DataSourceInstanceID last_data_source_instance_id_ = 0;
+
+ // Buffer IDs are global across all consumers (because a Producer can produce
+ // data for more than one trace session, hence more than one consumer).
+ IdAllocator buffer_ids_;
+
+ std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
+
+ // TODO(primiano): There doesn't seem to be any good reason why |producers_|
+ // is a map indexed by ID and not just a set<ProducerEndpointImpl*>.
std::map<ProducerID, ProducerEndpointImpl*> producers_;
+
std::set<ConsumerEndpointImpl*> consumers_;
+ std::map<ConsumerEndpointImpl*, TracingSession> tracing_sessions_;
};
} // namespace perfetto
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
new file mode 100644
index 0000000..2747e49
--- /dev/null
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/ipc/posix_shared_memory.h"
+
+#include <inttypes.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "perfetto/tracing/ipc/consumer_ipc_client.h"
+#include "perfetto/tracing/ipc/producer_ipc_client.h"
+#include "perfetto/tracing/ipc/service_ipc_host.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+
+#include "protos/test_event.pbzero.h"
+#include "protos/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::_;
+
+constexpr char kProducerSockName[] = TEST_SOCK_NAME("tracing_test-producer");
+constexpr char kConsumerSockName[] = TEST_SOCK_NAME("tracing_test-consumer");
+
+class TracingIntegrationTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ DESTROY_TEST_SOCK(kProducerSockName);
+ DESTROY_TEST_SOCK(kConsumerSockName);
+ task_runner_.reset(new base::TestTaskRunner());
+ }
+
+ void TearDown() override {
+ task_runner_.reset();
+ DESTROY_TEST_SOCK(kProducerSockName);
+ DESTROY_TEST_SOCK(kConsumerSockName);
+ }
+
+ std::unique_ptr<base::TestTaskRunner> task_runner_;
+};
+
+class MockProducer : public Producer {
+ public:
+ ~MockProducer() override {}
+
+ // Producer implementation.
+ MOCK_METHOD0(OnConnect, void());
+ MOCK_METHOD0(OnDisconnect, void());
+ MOCK_METHOD2(CreateDataSourceInstance,
+ void(DataSourceInstanceID, const DataSourceConfig&));
+ MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
+};
+
+class MockConsumer : public Consumer {
+ public:
+ ~MockConsumer() override {}
+
+ // Producer implementation.
+ MOCK_METHOD0(OnConnect, void());
+ MOCK_METHOD0(OnDisconnect, void());
+ MOCK_METHOD2(OnTraceData,
+ void(const std::vector<TracePacket>&, bool /*has_more*/));
+};
+
+TEST_F(TracingIntegrationTest, WithIPCTransport) {
+ // Create the service host.
+ std::unique_ptr<ServiceIPCHost> svc =
+ ServiceIPCHost::CreateInstance(task_runner_.get());
+ svc->Start(kProducerSockName, kConsumerSockName);
+
+ // Create and connect a Producer.
+ MockProducer producer;
+ std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
+ ProducerIPCClient::Connect(kProducerSockName, &producer,
+ task_runner_.get());
+ auto on_producer_connect =
+ task_runner_->CreateCheckpoint("on_producer_connect");
+ EXPECT_CALL(producer, OnConnect()).WillOnce(Invoke(on_producer_connect));
+ task_runner_->RunUntilCheckpoint("on_producer_connect");
+
+ // Register a data source.
+ DataSourceDescriptor ds_desc;
+ ds_desc.set_name("perfetto.test");
+ auto on_data_source_registered =
+ task_runner_->CreateCheckpoint("on_data_source_registered");
+ producer_endpoint->RegisterDataSource(
+ ds_desc, [on_data_source_registered](DataSourceID dsid) {
+ PERFETTO_DLOG("Registered data source with ID: %" PRIu64, dsid);
+ on_data_source_registered();
+ });
+ task_runner_->RunUntilCheckpoint("on_data_source_registered");
+
+ // Create and connect a Consumer.
+ MockConsumer consumer;
+ std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
+ ConsumerIPCClient::Connect(kConsumerSockName, &consumer,
+ task_runner_.get());
+ auto on_consumer_connect =
+ task_runner_->CreateCheckpoint("on_consumer_connect");
+ EXPECT_CALL(consumer, OnConnect()).WillOnce(Invoke(on_consumer_connect));
+ task_runner_->RunUntilCheckpoint("on_consumer_connect");
+
+ // Start tracing.
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(4096 * 10);
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("perfetto.test");
+ ds_config->set_target_buffer(0);
+ ds_config->set_trace_category_filters("foo,bar");
+ consumer_endpoint->EnableTracing(trace_config);
+
+ // At this point, the Producer should be asked to turn its data source on.
+ DataSourceInstanceID ds_iid = 0;
+ auto on_create_ds_instance =
+ task_runner_->CreateCheckpoint("on_create_ds_instance");
+ EXPECT_CALL(producer, CreateDataSourceInstance(_, _))
+ .WillOnce(
+ Invoke([on_create_ds_instance, &ds_iid](DataSourceInstanceID id,
+ const DataSourceConfig& cfg) {
+ ASSERT_NE(0u, id);
+ ds_iid = id;
+ ASSERT_EQ("perfetto.test", cfg.name());
+ ASSERT_EQ(0u, cfg.target_buffer());
+ ASSERT_EQ("foo,bar", cfg.trace_category_filters());
+ on_create_ds_instance();
+ }));
+ task_runner_->RunUntilCheckpoint("on_create_ds_instance");
+
+ // Now let the data source fill some pages within the same task.
+ // Doing so should accumulate a bunch of chunks that will be notified by the
+ // a future task in one batch.
+ std::unique_ptr<TraceWriter> writer =
+ producer_endpoint->CreateTraceWriter(1 /* target_buffer */);
+ ASSERT_TRUE(writer);
+
+ const size_t kNumPackets = 10;
+ for (size_t i = 0; i < kNumPackets; i++) {
+ char buf[8];
+ sprintf(buf, "evt_%zu", i);
+ writer->NewTracePacket()->set_test_event()->set_str(buf, strlen(buf));
+ }
+
+ // Allow the service to see the NotifySharedMemoryUpdate() before disabling
+ // tracing.
+ task_runner_->RunUntilIdle();
+
+ // Disable tracing.
+ consumer_endpoint->DisableTracing();
+ auto on_teardown_ds_instance =
+ task_runner_->CreateCheckpoint("on_teardown_ds_instance");
+ EXPECT_CALL(producer, TearDownDataSourceInstance(ds_iid))
+ .WillOnce(InvokeWithoutArgs(on_teardown_ds_instance));
+ task_runner_->RunUntilCheckpoint("on_teardown_ds_instance");
+
+ // Read the log buffer.
+ consumer_endpoint->ReadBuffers();
+ size_t num_pack_rx = 0;
+ auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
+ EXPECT_CALL(consumer, OnTraceData(_, _))
+ .Times(kNumPackets)
+ .WillRepeatedly(
+ Invoke([&num_pack_rx, all_packets_rx](
+ const std::vector<TracePacket>& packets, bool has_more) {
+ // TODO(primiano): check contents, requires both pblite and pzero.
+ num_pack_rx += packets.size();
+ if (!has_more)
+ all_packets_rx();
+ }));
+ task_runner_->RunUntilCheckpoint("all_packets_rx");
+
+ // TODO(primiano): cover FreeBuffers.
+
+ // Destroy the service and check that both Producer and Consumer see an
+ // OnDisconnect() call.
+
+ auto on_producer_disconnect =
+ task_runner_->CreateCheckpoint("on_producer_disconnect");
+ EXPECT_CALL(producer, OnDisconnect())
+ .WillOnce(Invoke(on_producer_disconnect));
+
+ auto on_consumer_disconnect =
+ task_runner_->CreateCheckpoint("on_consumer_disconnect");
+ EXPECT_CALL(consumer, OnDisconnect())
+ .WillOnce(Invoke(on_consumer_disconnect));
+
+ svc.reset();
+ task_runner_->RunUntilCheckpoint("on_producer_disconnect");
+ task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
+}
+
+// TODO(primiano): add tests to cover:
+// - unknown fields preserved end-to-end.
+// - >1 data source.
+// - >1 data consumer sharing the same data source, with different TraceBuffers.
+
+} // namespace
+} // namespace perfetto