Add sync marker traces to allow fast parsing

Periodically writes a fixed 128 bits sync marker to traces,
every 10s, together with the clock snapshots. This is to
allow concurrent offline processing of traces, by being able to
quickly seek the markers and partitioning the trace.

Test: perfetto_unittests --gtest_filter=*ResynchronizeTraceStreamUsingSyncMarker
Change-Id: If598da06c165210e28416c33aeb5cd00bc84b065
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 14960fa..2ffe0bd 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -66,6 +66,7 @@
     svc.reset(static_cast<TracingServiceImpl*>(
         TracingService::CreateInstance(std::move(shm_factory), &task_runner)
             .release()));
+    svc->min_write_period_ms_ = 1;
   }
 
   std::unique_ptr<MockProducer> CreateMockProducer() {
@@ -84,11 +85,25 @@
     return svc->GetProducer(producer_id)->uid_;
   }
 
+  TracingServiceImpl::TracingSession* tracing_session() {
+    auto* session = svc->GetTracingSession(svc->last_tracing_session_id_);
+    EXPECT_NE(nullptr, session);
+    return session;
+  }
+
   size_t GetNumPendingFlushes() {
-    TracingServiceImpl::TracingSession* tracing_session =
-        svc->GetTracingSession(svc->last_tracing_session_id_);
-    EXPECT_NE(nullptr, tracing_session);
-    return tracing_session->pending_flushes.size();
+    return tracing_session()->pending_flushes.size();
+  }
+
+  void WaitForNextSyncMarker() {
+    tracing_session()->last_snapshot_time = base::TimeMillis(0);
+    static int attempt = 0;
+    while (tracing_session()->last_snapshot_time == base::TimeMillis(0)) {
+      auto checkpoint_name = "wait_snapshot_" + std::to_string(attempt++);
+      auto timer_expired = task_runner.CreateCheckpoint(checkpoint_name);
+      task_runner.PostDelayedTask([timer_expired] { timer_expired(); }, 1);
+      task_runner.RunUntilCheckpoint(checkpoint_name);
+    }
   }
 
   base::TestTaskRunner task_runner;
@@ -688,4 +703,78 @@
     consumer->FreeBuffers();
   }
 }
+
+// Writes a long trace and then tests that the trace parsed in partitions
+// derived by the synchronization markers is identical to the whole trace parsed
+// in one go.
+TEST_F(TracingServiceImplTest, ResynchronizeTraceStreamUsingSyncMarker) {
+  // Setup tracing.
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(4096);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+  trace_config.set_write_into_file(true);
+  trace_config.set_file_write_period_ms(1);
+  base::TempFile tmp_file = base::TempFile::Create();
+  consumer->EnableTracing(trace_config, base::ScopedFile(dup(tmp_file.fd())));
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceStart("data_source");
+
+  // Write some variable length payload, waiting for sync markers every now
+  // and then.
+  const int kNumMarkers = 5;
+  auto writer = producer->CreateTraceWriter("data_source");
+  for (int i = 1; i <= 100; i++) {
+    std::string payload(i, 'A' + (i % 25));
+    writer->NewTracePacket()->set_for_testing()->set_str(payload.c_str());
+    if (i % (100 / kNumMarkers) == 0) {
+      writer->Flush();
+      WaitForNextSyncMarker();
+    }
+  }
+  writer->Flush();
+  writer.reset();
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+
+  std::string trace_raw;
+  ASSERT_TRUE(base::ReadFile(tmp_file.path().c_str(), &trace_raw));
+
+  const auto kMarkerSize = sizeof(TracingServiceImpl::kSyncMarker);
+  const std::string kSyncMarkerStr(
+      reinterpret_cast<const char*>(TracingServiceImpl::kSyncMarker),
+      kMarkerSize);
+
+  // Read back the trace in partitions derived from the marker.
+  // The trace should look like this:
+  // [uid, marker] [event] [event] [uid, marker] [event] [event]
+  size_t num_markers = 0;
+  size_t start = 0;
+  size_t end = 0;
+  protos::Trace merged_trace;
+  for (size_t pos = 0; pos != std::string::npos; start = end) {
+    pos = trace_raw.find(kSyncMarkerStr, pos + 1);
+    num_markers++;
+    end = (pos == std::string::npos) ? trace_raw.size() : pos + kMarkerSize;
+    int size = static_cast<int>(end - start);
+    ASSERT_GT(size, 0);
+    protos::Trace trace_partition;
+    ASSERT_TRUE(trace_partition.ParseFromArray(trace_raw.data() + start, size));
+    merged_trace.MergeFrom(trace_partition);
+  }
+  EXPECT_GE(num_markers, kNumMarkers);
+
+  protos::Trace whole_trace;
+  ASSERT_TRUE(whole_trace.ParseFromString(trace_raw));
+
+  ASSERT_EQ(whole_trace.packet_size(), merged_trace.packet_size());
+  EXPECT_EQ(whole_trace.SerializeAsString(), merged_trace.SerializeAsString());
+}
+
 }  // namespace perfetto