trace writer: Send patches after completing fragmented packet

For scraping of incomplete chunks in the service to be useful, we
should ensure that prior completed chunks are patched as soon as
possible. Otherwise, the incomplete chunk's data can't be used by the
service because the prior chunk is still awaiting patches.

Thus, send the patches for previous chunks when a fragmented packet was
completed.

Bug: 73828976
Change-Id: Ia5879f47f2662061695e7a9a44da16ab4d1bef33
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index 06d8112..d46730a 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -133,40 +133,63 @@
 void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk,
                                                    BufferID target_buffer,
                                                    PatchList* patch_list) {
+  PERFETTO_DCHECK(chunk.is_valid());
+  const WriterID writer_id = chunk.writer_id();
+  UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
+                          patch_list);
+}
+
+void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
+                                          BufferID target_buffer,
+                                          PatchList* patch_list) {
+  PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
+  UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
+}
+
+void SharedMemoryArbiterImpl::UpdateCommitDataRequest(Chunk chunk,
+                                                      WriterID writer_id,
+                                                      BufferID target_buffer,
+                                                      PatchList* patch_list) {
+  // Note: chunk will be invalid if the call came from SendPatches().
   bool should_post_callback = false;
   bool should_commit_synchronously = false;
   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
   {
     std::lock_guard<std::mutex> scoped_lock(lock_);
-    uint8_t chunk_idx = chunk.chunk_idx();
-    const WriterID writer_id = chunk.writer_id();
-    bytes_pending_commit_ += chunk.size();
-    size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
-
-    // DO NOT access |chunk| after this point, has been std::move()-d above.
 
     if (!commit_data_req_) {
       commit_data_req_.reset(new CommitDataRequest());
       weak_this = weak_ptr_factory_.GetWeakPtr();
       should_post_callback = true;
     }
-    CommitDataRequest::ChunksToMove* ctm =
-        commit_data_req_->add_chunks_to_move();
-    ctm->set_page(static_cast<uint32_t>(page_idx));
-    ctm->set_chunk(chunk_idx);
-    ctm->set_target_buffer(target_buffer);
 
-    // If more than half of the SMB.size() is filled with completed chunks for
-    // which we haven't notified the service yet (i.e. they are still enqueued
-    // in |commit_data_req_|), force a synchronous CommitDataRequest(), to
-    // reduce the likeliness of stalling the writer.
-    if (bytes_pending_commit_ >= shmem_abi_.size() / 2) {
-      should_commit_synchronously = true;
-      should_post_callback = false;
+    // If a valid chunk is specified, return it and attach it to the request.
+    if (chunk.is_valid()) {
+      PERFETTO_DCHECK(chunk.writer_id() == writer_id);
+      uint8_t chunk_idx = chunk.chunk_idx();
+      bytes_pending_commit_ += chunk.size();
+      size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
+
+      // DO NOT access |chunk| after this point, has been std::move()-d above.
+
+      CommitDataRequest::ChunksToMove* ctm =
+          commit_data_req_->add_chunks_to_move();
+      ctm->set_page(static_cast<uint32_t>(page_idx));
+      ctm->set_chunk(chunk_idx);
+      ctm->set_target_buffer(target_buffer);
+
+      // If more than half of the SMB.size() is filled with completed chunks for
+      // which we haven't notified the service yet (i.e. they are still enqueued
+      // in |commit_data_req_|), force a synchronous CommitDataRequest(), to
+      // reduce the likeliness of stalling the writer.
+      if (bytes_pending_commit_ >= shmem_abi_.size() / 2) {
+        should_commit_synchronously = true;
+        should_post_callback = false;
+      }
     }
 
-    // Get the patches completed for the previous chunk from the |patch_list|
-    // and update it.
+    // Get the completed patches for previous chunks from the |patch_list|
+    // and attach them.
     ChunkID last_chunk_id = 0;  // 0 is irrelevant but keeps the compiler happy.
     CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
     while (!patch_list->empty() && patch_list->front().is_patched()) {
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 0c1b5ac..7507a9a 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -83,6 +83,13 @@
                             BufferID target_buffer,
                             PatchList*);
 
+  // Send a request to the service to apply completed patches from |patch_list|.
+  // |writer_id| is the ID of the TraceWriter that calls this method,
+  // |target_buffer| is the global trace buffer ID of its target buffer.
+  void SendPatches(WriterID writer_id,
+                   BufferID target_buffer,
+                   PatchList* patch_list);
+
   // Forces a synchronous commit of the completed packets without waiting for
   // the next task.
   void FlushPendingCommitDataRequests(std::function<void()> callback = {});
@@ -108,6 +115,11 @@
   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
 
+  void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
+                               WriterID writer_id,
+                               BufferID target_buffer,
+                               PatchList* patch_list);
+
   // Called by the TraceWriter destructor.
   void ReleaseWriterID(WriterID);
 
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index 33644be..4a5e913 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -93,8 +93,17 @@
   // It doesn't make sense to begin a packet that is going to fragment
   // immediately after (8 is just an arbitrary estimation on the minimum size of
   // a realistic packet).
-  if (protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8)
+  if (protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8) {
     protobuf_stream_writer_.Reset(GetNewBuffer());
+  }
+
+  // Send any completed patches to the service to facilitate trace data
+  // recovery by the service. This should only happen when we're completing
+  // the first packet in a chunk which was a continuation from the previous
+  // chunk, i.e. at most once per chunk.
+  if (!patch_list_.empty() && patch_list_.front().is_patched()) {
+    shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
+  }
 
   cur_packet_->Reset(&protobuf_stream_writer_);
   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index 2b5197b..f9a9230 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -32,11 +32,14 @@
 namespace {
 
 class FakeProducerEndpoint : public TracingService::ProducerEndpoint {
+ public:
   void RegisterDataSource(const DataSourceDescriptor&) override {}
   void UnregisterDataSource(const std::string&) override {}
   void RegisterTraceWriter(uint32_t, uint32_t) override {}
   void UnregisterTraceWriter(uint32_t) override {}
-  void CommitData(const CommitDataRequest&, CommitDataCallback) override {}
+  void CommitData(const CommitDataRequest& req, CommitDataCallback) override {
+    last_commit_data_request = req;
+  }
   void NotifyFlushComplete(FlushRequestID) override {}
   void NotifyDataSourceStopped(DataSourceInstanceID) override {}
   SharedMemory* shared_memory() const override { return nullptr; }
@@ -44,6 +47,8 @@
   std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
     return nullptr;
   }
+
+  CommitDataRequest last_commit_data_request;
 };
 
 class TraceWriterImplTest : public AlignedBufferTest {
@@ -121,13 +126,22 @@
   std::string large_string = large_string_writer.str();
   packet->set_for_testing()->set_str(large_string.data(), large_string.size());
 
+  // First chunk should be committed.
+  arbiter_->FlushPendingCommitDataRequests();
+  const auto& last_commit = fake_producer_endpoint_.last_commit_data_request;
+  ASSERT_EQ(1, last_commit.chunks_to_move_size());
+  EXPECT_EQ(0u, last_commit.chunks_to_move()[0].page());
+  EXPECT_EQ(0u, last_commit.chunks_to_move()[0].chunk());
+  EXPECT_EQ(kBufId, last_commit.chunks_to_move()[0].target_buffer());
+  EXPECT_EQ(0, last_commit.chunks_to_patch_size());
+
   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
 
   // The first allocated chunk should be complete but need patching, since the
   // packet extended past the chunk and no patches for the packet size or string
   // field size were applied yet.
-  ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0, 0));
-  auto chunk = abi->TryAcquireChunkForReading(0, 0);
+  ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
+  auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
   ASSERT_TRUE(chunk.is_valid());
   ASSERT_EQ(1, chunk.header()->packets.load().count);
   ASSERT_TRUE(chunk.header()->packets.load().flags &
@@ -135,8 +149,18 @@
   ASSERT_TRUE(chunk.header()->packets.load().flags &
               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
 
-  // TODO(eseckler): Also verify that the next commit contains patch entries for
-  // the packet size and string field size.
+  // Starting a new packet should cause patches to be applied.
+  packet->Finalize();
+  auto packet2 = writer->NewTracePacket();
+  arbiter_->FlushPendingCommitDataRequests();
+  EXPECT_EQ(0, last_commit.chunks_to_move_size());
+  ASSERT_EQ(1, last_commit.chunks_to_patch_size());
+  EXPECT_EQ(writer->writer_id(), last_commit.chunks_to_patch()[0].writer_id());
+  EXPECT_EQ(kBufId, last_commit.chunks_to_patch()[0].target_buffer());
+  EXPECT_EQ(chunk.header()->chunk_id.load(),
+            last_commit.chunks_to_patch()[0].chunk_id());
+  EXPECT_FALSE(last_commit.chunks_to_patch()[0].has_more_patches());
+  ASSERT_EQ(1, last_commit.chunks_to_patch()[0].patches_size());
 }
 
 // TODO(primiano): add multi-writer test.