Add sync marker traces to allow fast parsing

Periodically writes a fixed 128 bits sync marker to traces,
every 10s, together with the clock snapshots. This is to
allow concurrent offline processing of traces, by being able to
quickly seek the markers and partitioning the trace.

Test: perfetto_unittests --gtest_filter=*ResynchronizeTraceStreamUsingSyncMarker
Change-Id: If598da06c165210e28416c33aeb5cd00bc84b065
diff --git a/protos/perfetto/trace/perfetto_trace.proto b/protos/perfetto/trace/perfetto_trace.proto
index 97267eb..b83ecf0 100644
--- a/protos/perfetto/trace/perfetto_trace.proto
+++ b/protos/perfetto/trace/perfetto_trace.proto
@@ -46,6 +46,12 @@
     // removed field with id 34
     // removed field with id 35
 
+    // This field is emitted at periodic intervals (~10s) and
+    // contains always the binary representation of the UUID
+    // {82477a76-b28d-42ba-81dc-33326d57a079}. This is used to be able to
+    // efficiently partition long traces without having to fully parse them.
+    bytes synchronization_marker = 36;
+
     // This field is only used for testing.
     // removed field with id 536870911  // 2^29 - 1, max field id for protos.
   }
diff --git a/protos/perfetto/trace/trace_packet.proto b/protos/perfetto/trace/trace_packet.proto
index 9a5f494..553ade2 100644
--- a/protos/perfetto/trace/trace_packet.proto
+++ b/protos/perfetto/trace/trace_packet.proto
@@ -48,6 +48,12 @@
     FtraceStats ftrace_stats = 34;
     TraceStats trace_stats = 35;
 
+    // This field is emitted at periodic intervals (~10s) and
+    // contains always the binary representation of the UUID
+    // {82477a76-b28d-42ba-81dc-33326d57a079}. This is used to be able to
+    // efficiently partition long traces without having to fully parse them.
+    bytes synchronization_marker = 36;
+
     // This field is only used for testing.
     TestEvent for_testing = 536870911;  // 2^29 - 1, max field id for protos.
   }
diff --git a/protos/perfetto/trace/trusted_packet.proto b/protos/perfetto/trace/trusted_packet.proto
index 36262c8..758b3ca 100644
--- a/protos/perfetto/trace/trusted_packet.proto
+++ b/protos/perfetto/trace/trusted_packet.proto
@@ -49,4 +49,5 @@
   ClockSnapshot clock_snapshot = 6;
   TraceConfig trace_config = 33;
   TraceStats trace_stats = 35;
+  bytes synchronization_marker = 36;
 }
diff --git a/src/tracing/core/packet_stream_validator.cc b/src/tracing/core/packet_stream_validator.cc
index 5368999..3e6f1bd 100644
--- a/src/tracing/core/packet_stream_validator.cc
+++ b/src/tracing/core/packet_stream_validator.cc
@@ -50,6 +50,9 @@
   if (packet.has_trace_stats())
     return false;
 
+  if (!packet.synchronization_marker().empty())
+    return false;
+
   // We are deliberately not checking for clock_snapshot for the moment. It's
   // unclear if we want to allow producers to snapshot their clocks. Ideally we
   // want a security model where producers can only snapshot their own clocks
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 14960fa..2ffe0bd 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -66,6 +66,7 @@
     svc.reset(static_cast<TracingServiceImpl*>(
         TracingService::CreateInstance(std::move(shm_factory), &task_runner)
             .release()));
+    svc->min_write_period_ms_ = 1;
   }
 
   std::unique_ptr<MockProducer> CreateMockProducer() {
@@ -84,11 +85,25 @@
     return svc->GetProducer(producer_id)->uid_;
   }
 
+  TracingServiceImpl::TracingSession* tracing_session() {
+    auto* session = svc->GetTracingSession(svc->last_tracing_session_id_);
+    EXPECT_NE(nullptr, session);
+    return session;
+  }
+
   size_t GetNumPendingFlushes() {
-    TracingServiceImpl::TracingSession* tracing_session =
-        svc->GetTracingSession(svc->last_tracing_session_id_);
-    EXPECT_NE(nullptr, tracing_session);
-    return tracing_session->pending_flushes.size();
+    return tracing_session()->pending_flushes.size();
+  }
+
+  void WaitForNextSyncMarker() {
+    tracing_session()->last_snapshot_time = base::TimeMillis(0);
+    static int attempt = 0;
+    while (tracing_session()->last_snapshot_time == base::TimeMillis(0)) {
+      auto checkpoint_name = "wait_snapshot_" + std::to_string(attempt++);
+      auto timer_expired = task_runner.CreateCheckpoint(checkpoint_name);
+      task_runner.PostDelayedTask([timer_expired] { timer_expired(); }, 1);
+      task_runner.RunUntilCheckpoint(checkpoint_name);
+    }
   }
 
   base::TestTaskRunner task_runner;
@@ -688,4 +703,78 @@
     consumer->FreeBuffers();
   }
 }
+
+// Writes a long trace and then tests that the trace parsed in partitions
+// derived by the synchronization markers is identical to the whole trace parsed
+// in one go.
+TEST_F(TracingServiceImplTest, ResynchronizeTraceStreamUsingSyncMarker) {
+  // Setup tracing.
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(4096);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  trace_config.set_write_into_file(true);
+  trace_config.set_file_write_period_ms(1);
+  base::TempFile tmp_file = base::TempFile::Create();
+  consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  // Write some variable length payload, waiting for sync markers every now
+  // and then.
+  const int kNumMarkers = 5;
+  auto writer = producer->CreateTraceWriter("data_source");
+  for (int i = 1; i <= 100; i++) {
+    std::string payload(i, 'A' + (i % 25));
+    writer->NewTracePacket()->set_for_testing()->set_str(payload.c_str());
+    if (i % (100 / kNumMarkers) == 0) {
+      writer->Flush();
+      WaitForNextSyncMarker();
+    }
+  }
+  writer->Flush();
+  writer.reset();
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+
+  std::string trace_raw;
+  ASSERT_TRUE(base::ReadFile(tmp_file.path().c_str(), &trace_raw));
+
+  const auto kMarkerSize = sizeof(TracingServiceImpl::kSyncMarker);
+  const std::string kSyncMarkerStr(
+      reinterpret_cast<const char*>(TracingServiceImpl::kSyncMarker),
+      kMarkerSize);
+
+  // Read back the trace in partitions derived from the marker.
+  // The trace should look like this:
+  // [uid, marker] [event] [event] [uid, marker] [event] [event]
+  size_t num_markers = 0;
+  size_t start = 0;
+  size_t end = 0;
+  protos::Trace merged_trace;
+  for (size_t pos = 0; pos != std::string::npos; start = end) {
+    pos = trace_raw.find(kSyncMarkerStr, pos + 1);
+    num_markers++;
+    end = (pos == std::string::npos) ? trace_raw.size() : pos + kMarkerSize;
+    int size = static_cast<int>(end - start);
+    ASSERT_GT(size, 0);
+    protos::Trace trace_partition;
+    ASSERT_TRUE(trace_partition.ParseFromArray(trace_raw.data() + start, size));
+    merged_trace.MergeFrom(trace_partition);
+  }
+  EXPECT_GE(num_markers, kNumMarkers);
+
+  protos::Trace whole_trace;
+  ASSERT_TRUE(whole_trace.ParseFromString(trace_raw));
+
+  ASSERT_EQ(whole_trace.packet_size(), merged_trace.packet_size());
+  EXPECT_EQ(whole_trace.SerializeAsString(), merged_trace.SerializeAsString());
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index b4d6775..ef344a9 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -57,9 +57,7 @@
 namespace {
 constexpr size_t kDefaultShmPageSize = base::kPageSize;
 constexpr int kMaxBuffersPerConsumer = 128;
-constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
-constexpr base::TimeMillis kStatsSnapshotInterval(10 * 1000);
-constexpr int kMinWriteIntoFilePeriodMs = 100;
+constexpr base::TimeMillis kSnapshotsInterval(10 * 1000);
 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
 constexpr int kFlushTimeoutMs = 1000;
 constexpr int kMaxConcurrentTracingSessions = 5;
@@ -106,6 +104,7 @@
 constexpr size_t TracingServiceImpl::kDefaultShmSize;
 constexpr size_t TracingServiceImpl::kMaxShmSize;
 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
+constexpr uint8_t TracingServiceImpl::kSyncMarker[];
 
 // static
 std::unique_ptr<TracingService> TracingService::CreateInstance(
@@ -286,8 +285,8 @@
     uint32_t write_period_ms = cfg.file_write_period_ms();
     if (write_period_ms == 0)
       write_period_ms = kDefaultWriteIntoFilePeriodMs;
-    if (write_period_ms < kMinWriteIntoFilePeriodMs)
-      write_period_ms = kMinWriteIntoFilePeriodMs;
+    if (write_period_ms < min_write_period_ms_)
+      write_period_ms = min_write_period_ms_;
     tracing_session->write_period_ms = write_period_ms;
     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
     tracing_session->bytes_written_into_file = 0;
@@ -648,8 +647,14 @@
 
   std::vector<TracePacket> packets;
   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
-  MaybeSnapshotClocks(tracing_session, &packets);
-  MaybeSnapshotStats(tracing_session, &packets);
+
+  base::TimeMillis now = base::GetWallTimeMs();
+  if (now >= tracing_session->last_snapshot_time + kSnapshotsInterval) {
+    tracing_session->last_snapshot_time = now;
+    SnapshotSyncMarker(&packets);
+    SnapshotClocks(&packets);
+    SnapshotStats(tracing_session, &packets);
+  }
   MaybeEmitTraceConfig(tracing_session, &packets);
 
   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
@@ -1137,14 +1142,32 @@
 #endif
 }
 
-void TracingServiceImpl::MaybeSnapshotClocks(
-    TracingSession* tracing_session,
-    std::vector<TracePacket>* packets) {
-  base::TimeMillis now = base::GetWallTimeMs();
-  if (now < tracing_session->last_clock_snapshot + kClockSnapshotInterval)
-    return;
-  tracing_session->last_clock_snapshot = now;
+void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
+  // The sync markes is used to tokenize large traces efficiently.
+  // See description in trace_packet.proto.
+  if (sync_marker_packet_size_ == 0) {
+    // Serialize the marker and the uid separately to guarantee that the marker
+    // is serialzied at the end and is adjacent to the start of the next packet.
+    int size_left = static_cast<int>(sizeof(sync_marker_packet_));
+    uint8_t* dst = &sync_marker_packet_[0];
+    protos::TrustedPacket packet;
+    packet.set_trusted_uid(static_cast<int32_t>(uid_));
+    PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
+    size_left -= packet.ByteSize();
+    sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
+    dst += sync_marker_packet_size_;
 
+    packet.Clear();
+    packet.set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
+    PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
+    sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
+    PERFETTO_CHECK(sync_marker_packet_size_ <= sizeof(sync_marker_packet_));
+  };
+  packets->emplace_back();
+  packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
+}
+
+void TracingServiceImpl::SnapshotClocks(std::vector<TracePacket>* packets) {
   protos::TrustedPacket packet;
   protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot();
 
@@ -1198,13 +1221,8 @@
   packets->back().AddSlice(std::move(slice));
 }
 
-void TracingServiceImpl::MaybeSnapshotStats(TracingSession* tracing_session,
-                                            std::vector<TracePacket>* packets) {
-  base::TimeMillis now = base::GetWallTimeMs();
-  if (now < tracing_session->last_stats_snapshot + kStatsSnapshotInterval)
-    return;
-  tracing_session->last_stats_snapshot = now;
-
+void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
+                                       std::vector<TracePacket>* packets) {
   protos::TrustedPacket packet;
   packet.set_trusted_uid(static_cast<int32_t>(uid_));
 
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index a4dd848..62961b3 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -56,6 +56,9 @@
   static constexpr size_t kDefaultShmSize = 256 * 1024ul;
   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
+  static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
+                                            0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
+                                            0x6d, 0x57, 0xa0, 0x79};
 
   // The implementation behind the service endpoint exposed to each producer.
   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
@@ -252,11 +255,9 @@
     // many entries as |config.buffers_size()|.
     std::vector<BufferID> buffers_index;
 
-    // When the last clock snapshot was emitted into the output stream.
-    base::TimeMillis last_clock_snapshot = {};
-
-    // When the last TraceStats snapshot was emitted into the output stream.
-    base::TimeMillis last_stats_snapshot = {};
+    // When the last snapshots (clock, stats, sync marker) were emitted into
+    // the output stream.
+    base::TimeMillis last_snapshot_time = {};
 
     // Whether we mirrored the trace config back to the trace output yet.
     bool did_emit_config = false;
@@ -292,9 +293,10 @@
   // shared memory and trace buffers.
   void UpdateMemoryGuardrail();
 
-  void MaybeSnapshotClocks(TracingSession*, std::vector<TracePacket>*);
+  void SnapshotSyncMarker(std::vector<TracePacket>*);
+  void SnapshotClocks(std::vector<TracePacket>*);
+  void SnapshotStats(TracingSession*, std::vector<TracePacket>*);
   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
-  void MaybeSnapshotStats(TracingSession*, std::vector<TracePacket>*);
   void OnFlushTimeout(TracingSessionID, FlushRequestID);
   void OnDisableTracingTimeout(TracingSessionID);
   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
@@ -319,6 +321,10 @@
   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
 
   bool lockdown_mode_ = false;
+  uint32_t min_write_period_ms_ = 100;  // Overridable for testing.
+
+  uint8_t sync_marker_packet_[32];  // Lazily initialized.
+  size_t sync_marker_packet_size_ = 0;
 
   PERFETTO_THREAD_CHECKER(thread_checker_)
 
diff --git a/test/test_helper.cc b/test/test_helper.cc
index ad01ced..51ae9aa 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -61,7 +61,7 @@
     protos::TracePacket packet;
     ASSERT_TRUE(encoded_packet.Decode(&packet));
     if (packet.has_clock_snapshot() || packet.has_trace_config() ||
-        packet.has_trace_stats()) {
+        packet.has_trace_stats() || !packet.synchronization_marker().empty()) {
       continue;
     }
     ASSERT_EQ(protos::TracePacket::kTrustedUid,