Add support for periodic flushes

Adds a new |fush_period_ms| field to the TraceConfig.
When set, it causes the trace service to issue periodic
flush requests to all data sources.

Bug: 73886018
Change-Id: Ie40fa83b811c8115cf27fd1034262610ca8b4a81
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 77d49e5..e556aa4 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -563,6 +563,60 @@
                         Property(&protos::TestEvent::str, Eq("payload")))));
 }
 
+TEST_F(TracingServiceImplTest, PeriodicFlush) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  trace_config.set_flush_period_ms(1);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  std::unique_ptr<TraceWriter> writer =
+      producer->CreateTraceWriter("data_source");
+
+  const int kNumFlushes = 3;
+  auto checkpoint = task_runner.CreateCheckpoint("all_flushes_done");
+  int flushes_seen = 0;
+  EXPECT_CALL(*producer, Flush(_, _, _))
+      .WillRepeatedly(Invoke([&producer, &writer, &flushes_seen, checkpoint](
+                                 FlushRequestID flush_req_id,
+                                 const DataSourceInstanceID*, size_t) {
+        {
+          auto tp = writer->NewTracePacket();
+          char payload[32];
+          sprintf(payload, "f_%d", flushes_seen);
+          tp->set_for_testing()->set_str(payload);
+        }
+        writer->Flush();
+        producer->endpoint()->NotifyFlushComplete(flush_req_id);
+        if (++flushes_seen == kNumFlushes)
+          checkpoint();
+      }));
+  task_runner.RunUntilCheckpoint("all_flushes_done");
+
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+  auto trace_packets = consumer->ReadBuffers();
+  for (int i = 0; i < kNumFlushes; i++) {
+    EXPECT_THAT(trace_packets,
+                Contains(Property(&protos::TracePacket::for_testing,
+                                  Property(&protos::TestEvent::str,
+                                           Eq("f_" + std::to_string(i))))));
+  }
+}
+
 // Creates a tracing session where some of the data sources set the
 // |will_notify_on_stop| flag and checks that the OnTracingDisabled notification
 // to the consumer is delayed until the acks are received.
diff --git a/src/tracing/core/trace_config.cc b/src/tracing/core/trace_config.cc
index 0b616f4..102a1bd 100644
--- a/src/tracing/core/trace_config.cc
+++ b/src/tracing/core/trace_config.cc
@@ -97,6 +97,11 @@
                 "size mismatch");
   deferred_start_ =
       static_cast<decltype(deferred_start_)>(proto.deferred_start());
+
+  static_assert(sizeof(flush_period_ms_) == sizeof(proto.flush_period_ms()),
+                "size mismatch");
+  flush_period_ms_ =
+      static_cast<decltype(flush_period_ms_)>(proto.flush_period_ms());
   unknown_fields_ = proto.unknown_fields();
 }
 
@@ -162,6 +167,11 @@
                 "size mismatch");
   proto->set_deferred_start(
       static_cast<decltype(proto->deferred_start())>(deferred_start_));
+
+  static_assert(sizeof(flush_period_ms_) == sizeof(proto->flush_period_ms()),
+                "size mismatch");
+  proto->set_flush_period_ms(
+      static_cast<decltype(proto->flush_period_ms())>(flush_period_ms_));
   *(proto->mutable_unknown_fields()) = unknown_fields_;
 }
 
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 0950fcd..030755b 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -419,6 +419,10 @@
         tracing_session->delay_to_next_write_period_ms());
   }
 
+  // Start the periodic flush tasks if the config specified a flush period.
+  if (tracing_session->config.flush_period_ms())
+    PeriodicFlushTask(tsid, /*post_next_only=*/true);
+
   for (const auto& kv : tracing_session->data_source_instances) {
     ProducerID producer_id = kv.first;
     const DataSourceInstance& data_source = kv.second;
@@ -665,6 +669,31 @@
   });
 }
 
+void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
+                                           bool post_next_only) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  TracingSession* tracing_session = GetTracingSession(tsid);
+  if (!tracing_session || tracing_session->state != TracingSession::STARTED)
+    return;
+
+  uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
+  auto weak_this = weak_ptr_factory_.GetWeakPtr();
+  task_runner_->PostDelayedTask(
+      [weak_this, tsid] {
+        if (weak_this)
+          weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
+      },
+      flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
+
+  if (post_next_only)
+    return;
+
+  Flush(tsid, kFlushTimeoutMs, [](bool success) {
+    if (!success)
+      PERFETTO_ELOG("Periodic flush timed out");
+  });
+}
+
 // Note: when this is called to write into a file passed when starting tracing
 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
 // to send the trace data back over IPC).
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 685bad6..d2f97ae 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -311,6 +311,7 @@
   void OnFlushTimeout(TracingSessionID, FlushRequestID);
   void OnDisableTracingTimeout(TracingSessionID);
   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
+  void PeriodicFlushTask(TracingSessionID, bool post_next_only);
   TraceBuffer* GetBufferByID(BufferID);
 
   base::TaskRunner* const task_runner_;