Add initial Tracing Service implementation

The code still has lot of rough edges and TODOs, but
is in a good state to test integration with other system
components and cover CTS tests.

Bug: 70284518
Change-Id: Id6ac73426750cc4002d4e5bca65a66f4684c116b
diff --git a/Android.bp b/Android.bp
index 024a3e5..b301a5c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -448,6 +448,7 @@
     "src/tracing/ipc/service/service_ipc_host_impl.cc",
     "src/tracing/test/aligned_buffer_test.cc",
     "src/tracing/test/test_shared_memory.cc",
+    "src/tracing/test/tracing_integration_test.cc",
     "tools/sanitizers_unittests/sanitizers_unittest.cc",
   ],
   shared_libs: [
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index 5650291..eb068d3 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -429,6 +429,12 @@
            (kAllChunksComplete & ((1 << (num_chunks * kChunkShift)) - 1));
   }
 
+  // It is safe to call this only from the Service after having observed, with
+  // acquire semantics, that the page has been properly partitioned.
+  uint16_t get_target_buffer(size_t page_idx) {
+    return page_header(page_idx)->target_buffer.load(std::memory_order_relaxed);
+  }
+
   // For testing / debugging only.
   std::string page_header_dbg(size_t page_idx) {
     uint32_t x = page_header(page_idx)->layout.load(std::memory_order_relaxed);
diff --git a/protos/test_event.proto b/protos/test_event.proto
index bac6322..f43e4bd 100644
--- a/protos/test_event.proto
+++ b/protos/test_event.proto
@@ -19,4 +19,6 @@
 
 package perfetto.protos;
 
-message TestEvent {}
+message TestEvent {
+  optional string str = 1;
+}
diff --git a/src/base/debug_crash_stack_trace.cc b/src/base/debug_crash_stack_trace.cc
index 95d2abf..a81fefc 100644
--- a/src/base/debug_crash_stack_trace.cc
+++ b/src/base/debug_crash_stack_trace.cc
@@ -115,7 +115,7 @@
   PrintHex(reinterpret_cast<uintptr_t>(info->si_addr));
   Print("\n\nBacktrace:\n");
 
-  const size_t kMaxFrames = 32;
+  const size_t kMaxFrames = 64;
   uintptr_t frames[kMaxFrames];
   StackCrawlState unwind_state(frames, kMaxFrames);
   _Unwind_Backtrace(&TraceStackFrame, &unwind_state);
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index e805494..0b0a8e6 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -100,6 +100,7 @@
     "test/aligned_buffer_test.h",
     "test/test_shared_memory.cc",
     "test/test_shared_memory.h",
+    "test/tracing_integration_test.cc",
   ]
 }
 
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 6785bdb..b0cc33c 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -17,20 +17,26 @@
 #include "src/tracing/core/service_impl.h"
 
 #include <inttypes.h>
+#include <string.h>
 
 #include <algorithm>
 
 #include "perfetto/base/logging.h"
 #include "perfetto/base/task_runner.h"
+#include "perfetto/protozero/proto_utils.h"
 #include "perfetto/tracing/core/consumer.h"
 #include "perfetto/tracing/core/data_source_config.h"
 #include "perfetto/tracing/core/producer.h"
 #include "perfetto/tracing/core/shared_memory.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
 
 namespace perfetto {
 
 // TODO add ThreadChecker everywhere.
 
+using protozero::proto_utils::ParseVarInt;
+
 namespace {
 constexpr size_t kPageSize = 4096;
 constexpr size_t kDefaultShmSize = kPageSize * 16;  // 64 KB.
@@ -47,7 +53,9 @@
 
 ServiceImpl::ServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,
                          base::TaskRunner* task_runner)
-    : shm_factory_(std::move(shm_factory)), task_runner_(task_runner) {
+    : task_runner_(task_runner),
+      shm_factory_(std::move(shm_factory)),
+      buffer_ids_(kMaxTraceBuffers) {
   PERFETTO_DCHECK(task_runner_);
 }
 
@@ -105,22 +113,210 @@
   consumers_.erase(consumer);
 }
 
-void ServiceImpl::EnableTracing(ConsumerEndpointImpl*, const TraceConfig&) {
-  PERFETTO_DLOG("not implemented yet");
+void ServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
+                                const TraceConfig& cfg) {
+  if (tracing_sessions_.count(consumer)) {
+    PERFETTO_DLOG(
+        "A Consumer is trying to EnableTracing() but another tracing session "
+        "is already active");
+    // TODO(primiano): make this a bool and return failure to the IPC layer.
+    return;
+  }
+  TracingSession& ts =
+      tracing_sessions_.emplace(consumer, TracingSession{}).first->second;
+
+  // Initialize the log buffers.
+  bool did_allocate_all_buffers = true;
+  for (const TraceConfig::BufferConfig& buffer_cfg : cfg.buffers()) {
+    BufferID id = static_cast<BufferID>(buffer_ids_.Allocate());
+    if (!id) {
+      did_allocate_all_buffers = false;
+      break;
+    }
+    PERFETTO_DCHECK(ts.trace_buffers.count(id) == 0);
+    // TODO(primiano): make TraceBuffer::kBufferPageSize dynamic.
+    ts.trace_buffers.emplace(id, TraceBuffer(buffer_cfg.size_kb() * 1024u));
+  }
+
+  // This can happen if all the kMaxTraceBuffers slots are taken (i.e. we are
+  // not talking about OOM here, just creating > kMaxTraceBuffers entries). In
+  // this case, free all the previously allocated buffers and abort.
+  // TODO: add a test to cover this case, this is quite subtle.
+  if (!did_allocate_all_buffers) {
+    for (const auto& kv : ts.trace_buffers)
+      buffer_ids_.Free(kv.first);
+    ts.trace_buffers.clear();
+    return;  // TODO(primiano): return failure condition?
+  }
+
+  // Enable the data sources on the producers.
+  for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
+    // Scan all the registered data sources with a matching name.
+    auto range = data_sources_.equal_range(cfg_data_source.config().name());
+    for (auto it = range.first; it != range.second; it++) {
+      const RegisteredDataSource& reg_data_source = it->second;
+      // TODO(primiano): match against |producer_name_filter|.
+
+      const ProducerID producer_id = reg_data_source.producer_id;
+      auto producer_iter = producers_.find(producer_id);
+      if (producer_iter == producers_.end()) {
+        PERFETTO_DCHECK(false);  // Something in the unregistration is broken.
+        continue;
+      }
+      ProducerEndpointImpl* producer = producer_iter->second;
+      DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
+      ts.data_source_instances.emplace(producer_id, inst_id);
+      producer->producer()->CreateDataSourceInstance(inst_id,
+                                                     cfg_data_source.config());
+    }
+  }
+}  // namespace perfetto
+
+void ServiceImpl::DisableTracing(ConsumerEndpointImpl* consumer) {
+  auto it = tracing_sessions_.find(consumer);
+  if (it == tracing_sessions_.end()) {
+    PERFETTO_DLOG("No active tracing session found for the Consumer");
+    return;
+  }
+  TracingSession& tracing_session = it->second;
+  for (const auto& data_source_inst : tracing_session.data_source_instances) {
+    auto producer_it = producers_.find(data_source_inst.first);
+    if (producer_it == producers_.end())
+      continue;  // This could legitimately happen if a Producer disconnects.
+    producer_it->second->producer()->TearDownDataSourceInstance(
+        data_source_inst.second);
+  }
+  tracing_session.data_source_instances.clear();
 }
 
-void ServiceImpl::DisableTracing(ConsumerEndpointImpl*) {
-  PERFETTO_DLOG("not implemented yet");
-}
+void ServiceImpl::ReadBuffers(ConsumerEndpointImpl* consumer) {
+  auto it = tracing_sessions_.find(consumer);
+  if (it == tracing_sessions_.end()) {
+    PERFETTO_DLOG(
+        "Consumer invoked ReadBuffers() but no tracing session is active");
+    return;  // TODO(primiano): signal failure?
+  }
+  // TODO(primiano): Most of this code is temporary and we should find a better
+  // solution to bookkeep the log buffer (e.g., an allocator-like freelist)
+  // rather than leveraging the SharedMemoryABI (which is intended only for the
+  // Producer <> Service SMB and not for the TraceBuffer itself).
+  auto weak_consumer = consumer->GetWeakPtr();
+  for (auto& buf_it : it->second.trace_buffers) {
+    TraceBuffer& tbuf = buf_it.second;
+    SharedMemoryABI& abi = *tbuf.abi;
+    for (size_t i = 0; i < tbuf.num_pages(); i++) {
+      const size_t page_idx = (i + tbuf.cur_page) % tbuf.num_pages();
+      if (abi.is_page_free(page_idx))
+        continue;
+      uint32_t layout = abi.page_layout_dbg(page_idx);
+      size_t num_chunks = abi.GetNumChunksForLayout(layout);
+      for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+        if (abi.GetChunkState(page_idx, chunk_idx) ==
+            SharedMemoryABI::kChunkFree) {
+          continue;
+        }
+        auto chunk = abi.GetChunkUnchecked(page_idx, layout, chunk_idx);
+        uint16_t num_packets;
+        uint8_t flags;
+        std::tie(num_packets, flags) = chunk.GetPacketCountAndFlags();
+        const uint8_t* ptr = chunk.payload_begin();
 
-void ServiceImpl::ReadBuffers(ConsumerEndpointImpl*) {
-  PERFETTO_DLOG("not implemented yet");
+        // shared_ptr is really a workardound for the fact that is not possible
+        // to std::move() move-only types in labmdas until C++17.
+        std::shared_ptr<std::vector<TracePacket>> packets(
+            new std::vector<TracePacket>());
+        packets->reserve(num_packets);
+
+        for (size_t pack_idx = 0; pack_idx < num_packets; pack_idx++) {
+          uint64_t pack_size = 0;
+          ptr = ParseVarInt(ptr, chunk.end(), &pack_size);
+          // TODO stitching, look at the flags.
+          bool skip = (pack_idx == 0 &&
+                       flags & SharedMemoryABI::ChunkHeader::
+                                   kFirstPacketContinuesFromPrevChunk) ||
+                      (pack_idx == num_packets - 1 &&
+                       flags & SharedMemoryABI::ChunkHeader::
+                                   kLastPacketContinuesOnNextChunk);
+
+          PERFETTO_DLOG("  #%-3zu len:%" PRIu64 " skip: %d\n", pack_idx,
+                        pack_size, skip);
+          if (ptr > chunk.end() - pack_size) {
+            PERFETTO_DLOG("out of bounds!\n");
+            break;
+          }
+          if (!skip) {
+            packets->emplace_back();
+            packets->back().AddChunk(Chunk(ptr, pack_size));
+          }
+          ptr += pack_size;
+        }  // for(packet)
+        task_runner_->PostTask([weak_consumer, packets]() {
+          if (weak_consumer)
+            weak_consumer->consumer()->OnTraceData(*packets, true /*has_more*/);
+        });
+      }  // for(chunk)
+    }    // for(page_idx)
+  }      // for(buffer_id)
+  task_runner_->PostTask([weak_consumer]() {
+    if (weak_consumer)
+      weak_consumer->consumer()->OnTraceData(std::vector<TracePacket>(),
+                                             false /*has_more*/);
+  });
 }
 
 void ServiceImpl::FreeBuffers(ConsumerEndpointImpl*) {
+  // TODO(primiano): implement here.
   PERFETTO_DLOG("not implemented yet");
 }
 
+void ServiceImpl::RegisterDataSource(ProducerID producer_id,
+                                     DataSourceID ds_id,
+                                     const DataSourceDescriptor& desc) {
+  PERFETTO_DCHECK(!desc.name().empty());
+  data_sources_.emplace(desc.name(),
+                        RegisteredDataSource{producer_id, ds_id, desc});
+}
+
+void ServiceImpl::CopyProducerPageIntoLogBuffer(ProducerID producer_id,
+                                                BufferID target_buffer,
+                                                const uint8_t* src,
+                                                size_t size) {
+  // TODO right now the page_size in the SMB and the trace_buffers_ can
+  // mismatch. Remove the ability to decide the page size on the Producer.
+
+  // TODO(primiano): We should have a direct index to find the TargetBuffer and
+  // perform ACL checks without iterating through all the producers.
+  TraceBuffer* tbuf = nullptr;
+  for (auto& sessions_it : tracing_sessions_) {
+    for (auto& tbuf_it : sessions_it.second.trace_buffers) {
+      const BufferID id = tbuf_it.first;
+      if (id == target_buffer) {
+        // TODO(primiano): we should have some stronger check to prevent that
+        // the Producer passes |target_buffer| which is valid, but that we never
+        // asked it to use. Essentially we want to prevent a malicious producer
+        // to inject data into a log buffer that has nothing to do with it.
+        tbuf = &tbuf_it.second;
+        break;
+      }
+    }
+  }
+
+  if (!tbuf) {
+    PERFETTO_DLOG("Could not find target buffer %u for producer %" PRIu64,
+                  target_buffer, producer_id);
+    return;
+  }
+
+  PERFETTO_DCHECK(size == TraceBuffer::kBufferPageSize);
+  uint8_t* dst = tbuf->get_next_page();
+
+  // TODO(primiano): use sendfile(). Requires to make the tbuf itself
+  // a file descriptor (just use SharedMemory without sharing it).
+  PERFETTO_DLOG("Copying page %p from producer %" PRIu64,
+                reinterpret_cast<const void*>(src), producer_id);
+  memcpy(dst, src, size);
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ServiceImpl::ConsumerEndpointImpl implementation
 ////////////////////////////////////////////////////////////////////////////////
@@ -170,7 +366,14 @@
       service_(service),
       task_runner_(task_runner),
       producer_(std::move(producer)),
-      shared_memory_(std::move(shared_memory)) {}
+      shared_memory_(std::move(shared_memory)),
+      shmem_abi_(reinterpret_cast<uint8_t*>(shared_memory_->start()),
+                 shared_memory_->size(),
+                 kPageSize) {
+  // TODO(primiano): make the page-size for the SHM dynamic and find a way to
+  // communicate that to the Producer (add a field to the
+  // InitializeConnectionResponse IPC).
+}
 
 ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
   producer_->OnDisconnect();
@@ -178,23 +381,48 @@
 }
 
 void ServiceImpl::ProducerEndpointImpl::RegisterDataSource(
-    const DataSourceDescriptor&,
+    const DataSourceDescriptor& desc,
     RegisterDataSourceCallback callback) {
-  const DataSourceID dsid = ++last_data_source_id_;
-  task_runner_->PostTask(std::bind(std::move(callback), dsid));
-  // TODO implement the bookkeeping logic.
+  DataSourceID ds_id = ++last_data_source_id_;
+  if (!desc.name().empty()) {
+    service_->RegisterDataSource(id_, ds_id, desc);
+  } else {
+    PERFETTO_DLOG("Received RegisterDataSource() with empty name");
+    ds_id = 0;
+  }
+  task_runner_->PostTask(std::bind(std::move(callback), ds_id));
 }
 
 void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
     DataSourceID dsid) {
   PERFETTO_CHECK(dsid);
-  // TODO implement the bookkeeping logic.
+  // TODO(primiano): implement the bookkeeping logic.
 }
 
 void ServiceImpl::ProducerEndpointImpl::NotifySharedMemoryUpdate(
     const std::vector<uint32_t>& changed_pages) {
-  // TODO implement the bookkeeping logic.
-  return;
+  for (uint32_t page_idx : changed_pages) {
+    if (page_idx >= shmem_abi_.num_pages())
+      continue;  // Very likely a malicious producer playing dirty.
+
+    if (!shmem_abi_.is_page_complete(page_idx))
+      continue;
+    if (!shmem_abi_.TryAcquireAllChunksForReading(page_idx))
+      continue;
+
+    // TODO: we should start collecting individual chunks from non fully
+    // complete pages after a while.
+
+    service_->CopyProducerPageIntoLogBuffer(
+        id_, shmem_abi_.get_target_buffer(page_idx),
+        shmem_abi_.page_start(page_idx), shmem_abi_.page_size());
+
+    shmem_abi_.ReleaseAllChunksAsFree(page_idx);
+  }
+}
+
+SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
+  return shared_memory_.get();
 }
 
 std::unique_ptr<TraceWriter>
@@ -206,8 +434,29 @@
   PERFETTO_CHECK(false);
 }
 
-SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
-  return shared_memory_.get();
+////////////////////////////////////////////////////////////////////////////////
+// ServiceImpl::TraceBuffer implementation
+////////////////////////////////////////////////////////////////////////////////
+
+ServiceImpl::TraceBuffer::TraceBuffer(size_t sz) : size(sz) {
+  void* ptr = nullptr;
+  PERFETTO_CHECK(size % kBufferPageSize == 0);
+  // TODO(primiano): introduce base::PageAllocator and use mmap() instead.
+  if (posix_memalign(&ptr, kPageSize, size)) {
+    PERFETTO_ELOG("Trace buffer allocation failed (size: %zu, page_size: %zu)",
+                  size, kBufferPageSize);
+    return;
+  }
+  PERFETTO_CHECK(ptr);
+  memset(ptr, 0, size);
+  data.reset(ptr);
+  abi.reset(new SharedMemoryABI(get_page(0), size, kBufferPageSize));
 }
 
+ServiceImpl::TraceBuffer::~TraceBuffer() = default;
+ServiceImpl::TraceBuffer::TraceBuffer(ServiceImpl::TraceBuffer&&) noexcept =
+    default;
+ServiceImpl::TraceBuffer& ServiceImpl::TraceBuffer::operator=(
+    ServiceImpl::TraceBuffer&&) = default;
+
 }  // namespace perfetto
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 91db0fb..52b1277 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -22,9 +22,13 @@
 #include <memory>
 #include <set>
 
+#include "perfetto/base/utils.h"
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
 #include "perfetto/tracing/core/service.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/id_allocator.h"
 
 namespace perfetto {
 
@@ -71,6 +75,7 @@
     base::TaskRunner* const task_runner_;
     Producer* producer_;
     std::unique_ptr<SharedMemory> shared_memory_;
+    SharedMemoryABI shmem_abi_;
     DataSourceID last_data_source_id_ = 0;
   };
 
@@ -104,6 +109,13 @@
 
   // Called by ProducerEndpointImpl.
   void DisconnectProducer(ProducerID);
+  void RegisterDataSource(ProducerID,
+                          DataSourceID,
+                          const DataSourceDescriptor&);
+  void CopyProducerPageIntoLogBuffer(ProducerID,
+                                     BufferID,
+                                     const uint8_t*,
+                                     size_t);
 
   // Called by ConsumerEndpointImpl.
   void DisconnectConsumer(ConsumerEndpointImpl*);
@@ -125,14 +137,76 @@
   ProducerEndpointImpl* GetProducer(ProducerID) const;
 
  private:
+  struct RegisteredDataSource {
+    ProducerID producer_id;
+    DataSourceID data_source_id;
+    DataSourceDescriptor descriptor;
+  };
+
+  struct TraceBuffer {
+    // TODO(primiano): make this configurable.
+    static constexpr size_t kBufferPageSize = 4096;
+    explicit TraceBuffer(size_t size);
+    ~TraceBuffer();
+    TraceBuffer(TraceBuffer&&) noexcept;
+    TraceBuffer& operator=(TraceBuffer&&);
+
+    size_t num_pages() const { return size / kBufferPageSize; }
+
+    uint8_t* get_page(size_t page) {
+      PERFETTO_DCHECK(page < num_pages());
+      return reinterpret_cast<uint8_t*>(data.get()) + page * kBufferPageSize;
+    }
+
+    uint8_t* get_next_page() {
+      size_t cur = cur_page;
+      cur_page = cur_page == num_pages() - 1 ? 0 : cur_page + 1;
+      return get_page(cur);
+    }
+
+    size_t size;
+    size_t cur_page = 0;  // Write pointer in the ring buffer.
+    std::unique_ptr<void, base::FreeDeleter> data;
+
+    // TODO(primiano): The TraceBuffer is not shared and there is no reason to
+    // use the SharedMemoryABI. This is just a a temporary workaround to reuse
+    // the convenience of SharedMemoryABI for bookkeeping of the buffer when
+    // implementing ReadBuffers().
+    std::unique_ptr<SharedMemoryABI> abi;
+  };
+
+  // Holds the state of a tracing session. A tracing session is uniquely bound
+  // a specific Consumer. Each Consumer can own one or more sessions.
+  struct TracingSession {
+    // List of data source instances that have been enabled on the various
+    // producers for this tracing session.
+    std::multimap<ProducerID, DataSourceInstanceID> data_source_instances;
+
+    // The key of this map matches the |target_buffer| in the
+    // SharedMemoryABI::ChunkHeader.
+    std::map<BufferID, TraceBuffer> trace_buffers;
+  };
+
   ServiceImpl(const ServiceImpl&) = delete;
   ServiceImpl& operator=(const ServiceImpl&) = delete;
 
-  std::unique_ptr<SharedMemory::Factory> shm_factory_;
   base::TaskRunner* const task_runner_;
+  std::unique_ptr<SharedMemory::Factory> shm_factory_;
   ProducerID last_producer_id_ = 0;
+  DataSourceInstanceID last_data_source_instance_id_ = 0;
+
+  // Buffer IDs are global across all consumers (because a Producer can produce
+  // data for more than one trace session, hence more than one consumer).
+  IdAllocator buffer_ids_;
+
+  std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
+
+  // TODO(primiano): There doesn't seem to be any good reason why |producers_|
+  // is a map indexed by ID and not just a set<ProducerEndpointImpl*>.
   std::map<ProducerID, ProducerEndpointImpl*> producers_;
+
   std::set<ConsumerEndpointImpl*> consumers_;
+  std::map<ConsumerEndpointImpl*, TracingSession> tracing_sessions_;
 };
 
 }  // namespace perfetto
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
new file mode 100644
index 0000000..2747e49
--- /dev/null
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/ipc/posix_shared_memory.h"
+
+#include <inttypes.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/data_source_config.h"
+#include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/producer.h"
+#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/trace_packet.h"
+#include "perfetto/tracing/core/trace_writer.h"
+#include "perfetto/tracing/ipc/consumer_ipc_client.h"
+#include "perfetto/tracing/ipc/producer_ipc_client.h"
+#include "perfetto/tracing/ipc/service_ipc_host.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/ipc/test/test_socket.h"
+
+#include "protos/test_event.pbzero.h"
+#include "protos/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::_;
+
+constexpr char kProducerSockName[] = TEST_SOCK_NAME("tracing_test-producer");
+constexpr char kConsumerSockName[] = TEST_SOCK_NAME("tracing_test-consumer");
+
+class TracingIntegrationTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    DESTROY_TEST_SOCK(kProducerSockName);
+    DESTROY_TEST_SOCK(kConsumerSockName);
+    task_runner_.reset(new base::TestTaskRunner());
+  }
+
+  void TearDown() override {
+    task_runner_.reset();
+    DESTROY_TEST_SOCK(kProducerSockName);
+    DESTROY_TEST_SOCK(kConsumerSockName);
+  }
+
+  std::unique_ptr<base::TestTaskRunner> task_runner_;
+};
+
+class MockProducer : public Producer {
+ public:
+  ~MockProducer() override {}
+
+  // Producer implementation.
+  MOCK_METHOD0(OnConnect, void());
+  MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(CreateDataSourceInstance,
+               void(DataSourceInstanceID, const DataSourceConfig&));
+  MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
+};
+
+class MockConsumer : public Consumer {
+ public:
+  ~MockConsumer() override {}
+
+  // Producer implementation.
+  MOCK_METHOD0(OnConnect, void());
+  MOCK_METHOD0(OnDisconnect, void());
+  MOCK_METHOD2(OnTraceData,
+               void(const std::vector<TracePacket>&, bool /*has_more*/));
+};
+
+TEST_F(TracingIntegrationTest, WithIPCTransport) {
+  // Create the service host.
+  std::unique_ptr<ServiceIPCHost> svc =
+      ServiceIPCHost::CreateInstance(task_runner_.get());
+  svc->Start(kProducerSockName, kConsumerSockName);
+
+  // Create and connect a Producer.
+  MockProducer producer;
+  std::unique_ptr<Service::ProducerEndpoint> producer_endpoint =
+      ProducerIPCClient::Connect(kProducerSockName, &producer,
+                                 task_runner_.get());
+  auto on_producer_connect =
+      task_runner_->CreateCheckpoint("on_producer_connect");
+  EXPECT_CALL(producer, OnConnect()).WillOnce(Invoke(on_producer_connect));
+  task_runner_->RunUntilCheckpoint("on_producer_connect");
+
+  // Register a data source.
+  DataSourceDescriptor ds_desc;
+  ds_desc.set_name("perfetto.test");
+  auto on_data_source_registered =
+      task_runner_->CreateCheckpoint("on_data_source_registered");
+  producer_endpoint->RegisterDataSource(
+      ds_desc, [on_data_source_registered](DataSourceID dsid) {
+        PERFETTO_DLOG("Registered data source with ID: %" PRIu64, dsid);
+        on_data_source_registered();
+      });
+  task_runner_->RunUntilCheckpoint("on_data_source_registered");
+
+  // Create and connect a Consumer.
+  MockConsumer consumer;
+  std::unique_ptr<Service::ConsumerEndpoint> consumer_endpoint =
+      ConsumerIPCClient::Connect(kConsumerSockName, &consumer,
+                                 task_runner_.get());
+  auto on_consumer_connect =
+      task_runner_->CreateCheckpoint("on_consumer_connect");
+  EXPECT_CALL(consumer, OnConnect()).WillOnce(Invoke(on_consumer_connect));
+  task_runner_->RunUntilCheckpoint("on_consumer_connect");
+
+  // Start tracing.
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(4096 * 10);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("perfetto.test");
+  ds_config->set_target_buffer(0);
+  ds_config->set_trace_category_filters("foo,bar");
+  consumer_endpoint->EnableTracing(trace_config);
+
+  // At this point, the Producer should be asked to turn its data source on.
+  DataSourceInstanceID ds_iid = 0;
+  auto on_create_ds_instance =
+      task_runner_->CreateCheckpoint("on_create_ds_instance");
+  EXPECT_CALL(producer, CreateDataSourceInstance(_, _))
+      .WillOnce(
+          Invoke([on_create_ds_instance, &ds_iid](DataSourceInstanceID id,
+                                                  const DataSourceConfig& cfg) {
+            ASSERT_NE(0u, id);
+            ds_iid = id;
+            ASSERT_EQ("perfetto.test", cfg.name());
+            ASSERT_EQ(0u, cfg.target_buffer());
+            ASSERT_EQ("foo,bar", cfg.trace_category_filters());
+            on_create_ds_instance();
+          }));
+  task_runner_->RunUntilCheckpoint("on_create_ds_instance");
+
+  // Now let the data source fill some pages within the same task.
+  // Doing so should accumulate a bunch of chunks that will be notified by the
+  // a future task in one batch.
+  std::unique_ptr<TraceWriter> writer =
+      producer_endpoint->CreateTraceWriter(1 /* target_buffer */);
+  ASSERT_TRUE(writer);
+
+  const size_t kNumPackets = 10;
+  for (size_t i = 0; i < kNumPackets; i++) {
+    char buf[8];
+    sprintf(buf, "evt_%zu", i);
+    writer->NewTracePacket()->set_test_event()->set_str(buf, strlen(buf));
+  }
+
+  // Allow the service to see the NotifySharedMemoryUpdate() before disabling
+  // tracing.
+  task_runner_->RunUntilIdle();
+
+  // Disable tracing.
+  consumer_endpoint->DisableTracing();
+  auto on_teardown_ds_instance =
+      task_runner_->CreateCheckpoint("on_teardown_ds_instance");
+  EXPECT_CALL(producer, TearDownDataSourceInstance(ds_iid))
+      .WillOnce(InvokeWithoutArgs(on_teardown_ds_instance));
+  task_runner_->RunUntilCheckpoint("on_teardown_ds_instance");
+
+  // Read the log buffer.
+  consumer_endpoint->ReadBuffers();
+  size_t num_pack_rx = 0;
+  auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
+  EXPECT_CALL(consumer, OnTraceData(_, _))
+      .Times(kNumPackets)
+      .WillRepeatedly(
+          Invoke([&num_pack_rx, all_packets_rx](
+                     const std::vector<TracePacket>& packets, bool has_more) {
+            // TODO(primiano): check contents, requires both pblite and pzero.
+            num_pack_rx += packets.size();
+            if (!has_more)
+              all_packets_rx();
+          }));
+  task_runner_->RunUntilCheckpoint("all_packets_rx");
+
+  // TODO(primiano): cover FreeBuffers.
+
+  // Destroy the service and check that both Producer and Consumer see an
+  // OnDisconnect() call.
+
+  auto on_producer_disconnect =
+      task_runner_->CreateCheckpoint("on_producer_disconnect");
+  EXPECT_CALL(producer, OnDisconnect())
+      .WillOnce(Invoke(on_producer_disconnect));
+
+  auto on_consumer_disconnect =
+      task_runner_->CreateCheckpoint("on_consumer_disconnect");
+  EXPECT_CALL(consumer, OnDisconnect())
+      .WillOnce(Invoke(on_consumer_disconnect));
+
+  svc.reset();
+  task_runner_->RunUntilCheckpoint("on_producer_disconnect");
+  task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
+}
+
+// TODO(primiano): add tests to cover:
+// - unknown fields preserved end-to-end.
+// - >1 data source.
+// - >1 data consumer sharing the same data source, with different TraceBuffers.
+
+}  // namespace
+}  // namespace perfetto