trace writer: Add a proxying StartupTraceWriter.

Adds StartupTraceWriter, which facilitates writing trace events during
early app startup while the connection to the perfetto service is
unavailable. The StartupTraceWriter writes into a lock-guarded
temporary local buffer until it is bound to a SMB-backed TraceWriter.
Once bound, any data written to the local buffer is copied to the SMB.
Future writes then proxy directly to the SMB-backed TraceWriter.

Bug: 122507010
Change-Id: I380eb088f42b050f097a6453c3001322c603d285
diff --git a/Android.bp b/Android.bp
index 82f4b33..8f5d8cb 100644
--- a/Android.bp
+++ b/Android.bp
@@ -88,6 +88,7 @@
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
+    "src/tracing/core/startup_trace_writer.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -273,6 +274,7 @@
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
+    "src/tracing/core/startup_trace_writer.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -412,6 +414,7 @@
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
+    "src/tracing/core/startup_trace_writer.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -608,6 +611,7 @@
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
+    "src/tracing/core/startup_trace_writer.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -2320,6 +2324,7 @@
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
+    "src/tracing/core/startup_trace_writer.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -2649,6 +2654,8 @@
     "src/tracing/core/shared_memory_arbiter_impl_unittest.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/sliced_protobuf_input_stream_unittest.cc",
+    "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_unittest.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
diff --git a/include/perfetto/protozero/message_handle.h b/include/perfetto/protozero/message_handle.h
index 5fbe05b..fd21d2e 100644
--- a/include/perfetto/protozero/message_handle.h
+++ b/include/perfetto/protozero/message_handle.h
@@ -39,6 +39,12 @@
 
 class PERFETTO_EXPORT MessageHandleBase {
  public:
+  class FinalizationListener {
+   public:
+    virtual ~FinalizationListener();
+    virtual void OnMessageFinalized(Message* message) = 0;
+  };
+
   ~MessageHandleBase();
 
   // Move-only type.
@@ -51,6 +57,10 @@
     return !!message_;
   }
 
+  void set_finalization_listener(FinalizationListener* listener) {
+    listener_ = listener;
+  }
+
  protected:
   explicit MessageHandleBase(Message* = nullptr);
   Message* operator->() const {
@@ -66,10 +76,27 @@
   MessageHandleBase(const MessageHandleBase&) = delete;
   MessageHandleBase& operator=(const MessageHandleBase&) = delete;
 
-  void reset_message() { message_ = nullptr; }
+  void reset_message() {
+    // This is called by Message::Finalize().
+    PERFETTO_DCHECK(message_->is_finalized());
+    message_ = nullptr;
+    listener_ = nullptr;
+  }
+
   void Move(MessageHandleBase&&);
 
+  void FinalizeMessage() {
+    // |message_| and |listener_| may be cleared by reset_message() during
+    // Message::Finalize().
+    auto* listener = listener_;
+    auto* message = message_;
+    message->Finalize();
+    if (listener)
+      listener->OnMessageFinalized(message);
+  }
+
   Message* message_;
+  FinalizationListener* listener_ = nullptr;
 #if PERFETTO_DCHECK_IS_ON()
   uint32_t generation_;
 #endif
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index 1320683..b09026c 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -27,6 +27,7 @@
     "shared_memory_abi.h",
     "shared_memory_arbiter.h",
     "slice.h",
+    "startup_trace_writer.h",
     "trace_config.h",
     "trace_packet.h",
     "trace_writer.h",
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index bfcac3b..7a5a14c 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -392,6 +392,19 @@
       return packets.count;
     }
 
+    // Increases |packets.count| to the given |packet_count|, but only if
+    // |packet_count| is larger than the current value of |packets.count|.
+    // Returns the new packet count. Same atomicity guarantees as
+    // IncrementPacketCount().
+    uint16_t IncreasePacketCountTo(uint16_t packet_count) {
+      ChunkHeader* chunk_header = header();
+      auto packets = chunk_header->packets.load(std::memory_order_relaxed);
+      if (packets.count < packet_count)
+        packets.count = packet_count;
+      chunk_header->packets.store(packets, std::memory_order_release);
+      return packets.count;
+    }
+
     // Flags are cleared by TryAcquireChunk(), by passing the new header for
     // the chunk.
     void SetFlag(ChunkHeader::Flags flag) {
@@ -518,6 +531,10 @@
 
   std::pair<size_t, size_t> GetPageAndChunkIndex(const Chunk& chunk);
 
+  uint16_t GetChunkSizeForLayout(uint32_t page_layout) const {
+    return chunk_sizes_[(page_layout & kLayoutMask) >> kLayoutShift];
+  }
+
   static ChunkState GetChunkStateFromLayout(uint32_t page_layout,
                                             size_t chunk_idx) {
     return static_cast<ChunkState>((page_layout >> (chunk_idx * kChunkShift)) &
@@ -546,10 +563,6 @@
   SharedMemoryABI(const SharedMemoryABI&) = delete;
   SharedMemoryABI& operator=(const SharedMemoryABI&) = delete;
 
-  uint16_t GetChunkSizeForLayout(uint32_t page_layout) const {
-    return chunk_sizes_[(page_layout & kLayoutMask) >> kLayoutShift];
-  }
-
   Chunk TryAcquireChunk(size_t page_idx,
                         size_t chunk_idx,
                         ChunkState,
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index 839e9db..e285648 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -34,6 +34,7 @@
 }
 
 class CommitDataRequest;
+class StartupTraceWriter;
 class SharedMemory;
 class TraceWriter;
 
@@ -50,6 +51,15 @@
   virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) = 0;
 
+  // Binds the provided unbound StartupTraceWriter to a new TraceWriter
+  // associated with the arbiter's SMB. Returns |false| if binding failed
+  // because the writer is concurrently writing data to its temporary buffer. In
+  // this case, the caller should retry (it is free to try again immediately or
+  // schedule a wakeup to retry later).
+  virtual bool BindStartupTraceWriter(StartupTraceWriter* writer,
+                                      BufferID target_buffer)
+      PERFETTO_WARN_UNUSED_RESULT = 0;
+
   // Notifies the service that all data for the given FlushRequestID has been
   // committed in the shared memory buffer.
   virtual void NotifyFlushComplete(FlushRequestID) = 0;
diff --git a/include/perfetto/tracing/core/startup_trace_writer.h b/include/perfetto/tracing/core/startup_trace_writer.h
new file mode 100644
index 0000000..d4d0f36
--- /dev/null
+++ b/include/perfetto/tracing/core/startup_trace_writer.h
@@ -0,0 +1,155 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/optional.h"
+#include "perfetto/base/thread_checker.h"
+#include "perfetto/protozero/message_handle.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/trace_writer.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiterImpl;
+
+namespace protos {
+namespace pbzero {
+class TracePacket;
+}  // namespace pbzero
+}  // namespace protos
+
+// Facilitates writing trace events in early phases of an application's startup
+// when the perfetto service is not available yet.
+//
+// Until the service is available, producer threads instantiate an unbound
+// StartupTraceWriter instance and use it to emit trace events. Each writer will
+// record the serialized trace events into a temporary local memory buffer.
+//
+// Once the service is available, the producer binds each StartupTraceWriter to
+// the SMB by calling SharedMemoryArbiter::BindStartupTraceWriter(). The data in
+// the writer's local buffer will then be copied into the SMB and the any future
+// writes will proxy directly to a new SMB-backed TraceWriter.
+//
+// Writing to the temporary local trace buffer is guarded by a lock and flag to
+// allow binding the writer from a different thread. When the writer starts
+// writing data by calling NewTracePacket(), the writer thread acquires the lock
+// to set a flag indicating that a write is in progress. Once the packet is
+// finalized, the flag is reset. To bind the writer, the lock is acquired while
+// the flag is unset and released only once binding completed, thereby blocking
+// the writer thread from starting a write concurrently.
+//
+// While unbound, the writer thread should finalize each TracePacket as soon as
+// possible to ensure that it doesn't block binding the writer.
+class PERFETTO_EXPORT StartupTraceWriter
+    : public TraceWriter,
+      public protozero::MessageHandleBase::FinalizationListener {
+ public:
+  // Create an unbound StartupTraceWriter that can later be bound by calling
+  // BindToTraceWriter().
+  StartupTraceWriter();
+
+  // Create a StartupTraceWriter bound to |trace_writer|. Should only be called
+  // on the writer thread.
+  explicit StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer);
+
+  ~StartupTraceWriter() override;
+
+  // TraceWriter implementation. These methods should only be called on the
+  // writer thread.
+  TracePacketHandle NewTracePacket() override;
+  void Flush(std::function<void()> callback = {}) override;
+
+  // Note that this will return 0 until the first TracePacket was started after
+  // binding.
+  WriterID writer_id() const override;
+
+  // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
+  // Called by SharedMemoryArbiterImpl::BindStartupTraceWriter().
+  //
+  // This method can be called on any thread. If any data was written locally
+  // before the writer was bound, BindToArbiter() will copy this data into
+  // chunks in the provided target buffer via the SMB. Any future packets will
+  // be directly written into the SMB via a newly obtained TraceWriter from the
+  // arbiter.
+  //
+  // Will fail and return |false| if a concurrent write is in progress.
+  bool BindToArbiter(SharedMemoryArbiterImpl*,
+                     BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
+
+  // Returns |true| if the writer thread has observed that the writer was bound
+  // to an SMB. Should only be called on the writer thread.
+  //
+  // The writer thread can use the return value to determine whether it needs to
+  // finalize the current TracePacket as soon as possible. It is only safe for
+  // the writer to batch data into a single TracePacket over a longer time
+  // period when this returns |true|.
+  bool was_bound() const {
+    PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+    return was_bound_;
+  }
+
+  // Should only be called on the writer thread.
+  size_t used_buffer_size();
+
+ private:
+  // protozero::MessageHandleBase::FinalizationListener implementation.
+  void OnMessageFinalized(protozero::Message* message) override;
+
+  void OnTracePacketCompleted();
+  ChunkID CommitLocalBufferChunks(SharedMemoryArbiterImpl*, WriterID, BufferID);
+
+  PERFETTO_THREAD_CHECKER(writer_thread_checker_)
+
+  // Only set and accessed from the writer thread. The writer thread flips this
+  // bit when it sees that trace_writer_ is set (while holding the lock).
+  // Caching this fact in this variable avoids the need to acquire the lock to
+  // check on later calls to NewTracePacket().
+  bool was_bound_ = false;
+
+  // All variables below this point are protected by |lock_|.
+  std::mutex lock_;
+
+  // Never reset once it is changed from |nullptr|.
+  std::unique_ptr<TraceWriter> trace_writer_ = nullptr;
+
+  // Local memory buffer for trace packets written before the writer is bound.
+  std::unique_ptr<protozero::ScatteredHeapBuffer> memory_buffer_;
+  std::unique_ptr<protozero::ScatteredStreamWriter> memory_stream_writer_;
+
+  std::vector<uint32_t> packet_sizes_;
+  size_t total_payload_size = 0;
+
+  // Whether the writer thread is currently writing a TracePacket.
+  bool write_in_progress_ = false;
+
+  // The packet returned via NewTracePacket() while the writer is unbound. Reset
+  // to |nullptr| once bound. Owned by this class, TracePacketHandle has just a
+  // pointer to it.
+  std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
diff --git a/include/perfetto/tracing/core/trace_writer.h b/include/perfetto/tracing/core/trace_writer.h
index 080d01d..ef864ba 100644
--- a/include/perfetto/tracing/core/trace_writer.h
+++ b/include/perfetto/tracing/core/trace_writer.h
@@ -77,6 +77,13 @@
 
   virtual WriterID writer_id() const = 0;
 
+  // Set the id of the first chunk the writer will emit. Returns |false| if not
+  // implemented or if the first chunk was already emitted by the writer.
+  //
+  // StartupTraceWriter will call this if it committed buffered data on
+  // behalf of the TraceWriter.
+  virtual bool SetFirstChunkId(ChunkID);
+
  private:
   TraceWriter(const TraceWriter&) = delete;
   TraceWriter& operator=(const TraceWriter&) = delete;
diff --git a/src/protozero/message_handle.cc b/src/protozero/message_handle.cc
index d8e5ad2..79c402d 100644
--- a/src/protozero/message_handle.cc
+++ b/src/protozero/message_handle.cc
@@ -22,6 +22,8 @@
 
 namespace protozero {
 
+MessageHandleBase::FinalizationListener::~FinalizationListener() {}
+
 MessageHandleBase::MessageHandleBase(Message* message) : message_(message) {
 #if PERFETTO_DCHECK_IS_ON()
   generation_ = message_ ? message->generation_ : 0;
@@ -35,7 +37,7 @@
 #if PERFETTO_DCHECK_IS_ON()
     PERFETTO_DCHECK(generation_ == message_->generation_);
 #endif
-    message_->Finalize();
+    FinalizeMessage();
   }
 }
 
@@ -48,7 +50,7 @@
   // one, finalize the old message. However, if the other message is the same as
   // the one we point to, don't finalize.
   if (message_ && message_ != other.message_)
-    message_->Finalize();
+    FinalizeMessage();
   Move(std::move(other));
   return *this;
 }
@@ -56,6 +58,8 @@
 void MessageHandleBase::Move(MessageHandleBase&& other) {
   message_ = other.message_;
   other.message_ = nullptr;
+  listener_ = other.listener_;
+  other.listener_ = nullptr;
 #if PERFETTO_DCHECK_IS_ON()
   if (message_) {
     generation_ = message_->generation_;
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index b36e073..ae49d44 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -54,6 +54,7 @@
     "core/shared_memory_arbiter_impl.h",
     "core/sliced_protobuf_input_stream.cc",
     "core/sliced_protobuf_input_stream.h",
+    "core/startup_trace_writer.cc",
     "core/sys_stats_config.cc",
     "core/test_config.cc",
     "core/trace_buffer.cc",
@@ -112,7 +113,9 @@
     sources += [
       "core/service_impl_unittest.cc",
       "core/shared_memory_arbiter_impl_unittest.cc",
+      "core/startup_trace_writer_unittest.cc",
       "core/trace_writer_impl_unittest.cc",
+      "test/fake_producer_endpoint.h",
       "test/mock_consumer.cc",
       "test/mock_consumer.h",
       "test/mock_producer.cc",
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index d46730a..e4d0e2d 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -21,6 +21,7 @@
 #include "perfetto/base/time.h"
 #include "perfetto/tracing/core/commit_data_request.h"
 #include "perfetto/tracing/core/shared_memory.h"
+#include "perfetto/tracing/core/startup_trace_writer.h"
 #include "src/tracing/core/null_trace_writer.h"
 #include "src/tracing/core/trace_writer_impl.h"
 
@@ -275,6 +276,11 @@
       new TraceWriterImpl(this, id, target_buffer));
 }
 
+bool SharedMemoryArbiterImpl::BindStartupTraceWriter(StartupTraceWriter* writer,
+                                                     BufferID target_buffer) {
+  return writer->BindToArbiter(this, target_buffer);
+}
+
 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
   bool should_post_commit_task = false;
   {
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 7507a9a..0ef8abe 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -36,6 +36,7 @@
 class CommitDataRequest;
 class PatchList;
 class TraceWriter;
+class TraceWriterImpl;
 
 namespace base {
 class TaskRunner;
@@ -103,7 +104,9 @@
   // SharedMemoryArbiter implementation.
   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
   std::unique_ptr<TraceWriter> CreateTraceWriter(
-      BufferID target_buffer = 0) override;
+      BufferID target_buffer) override;
+  bool BindStartupTraceWriter(StartupTraceWriter* writer,
+                              BufferID target_buffer) override;
 
   void NotifyFlushComplete(FlushRequestID) override;
 
diff --git a/src/tracing/core/startup_trace_writer.cc b/src/tracing/core/startup_trace_writer.cc
new file mode 100644
index 0000000..473f04a
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer.cc
@@ -0,0 +1,322 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/patch_list.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+namespace {
+
+SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter,
+                                WriterID writer_id,
+                                ChunkID chunk_id,
+                                bool fragmenting_packet) {
+  ChunkHeader::Packets packets = {};
+  if (fragmenting_packet) {
+    packets.count = 1;
+    packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
+  }
+
+  // The memory order of the stores below doesn't really matter. This |header|
+  // is just a local temporary object. The GetNewChunk() call below will copy it
+  // into the shared buffer with the proper barriers.
+  ChunkHeader header = {};
+  header.writer_id.store(writer_id, std::memory_order_relaxed);
+  header.chunk_id.store(chunk_id, std::memory_order_relaxed);
+  header.packets.store(packets, std::memory_order_relaxed);
+
+  return arbiter->GetNewChunk(header);
+}
+
+class LocalBufferReader {
+ public:
+  LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
+      : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
+
+  size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, size_t num_bytes) {
+    PERFETTO_CHECK(target_chunk->payload_size() >= num_bytes);
+    uint8_t* chunk_payload = target_chunk->payload_begin();
+    size_t bytes_read = 0;
+    while (bytes_read < num_bytes) {
+      if (cur_slice_ == buffer_slices_.end())
+        return bytes_read;
+
+      auto cur_slice_range = cur_slice_->GetUsedRange();
+
+      if (cur_slice_range.size() == cur_slice_offset_) {
+        cur_slice_offset_ = 0;
+        cur_slice_++;
+        continue;
+      }
+
+      size_t read_size = std::min(num_bytes - bytes_read,
+                                  cur_slice_range.size() - cur_slice_offset_);
+      memcpy(chunk_payload + bytes_read,
+             cur_slice_range.begin + cur_slice_offset_, read_size);
+      cur_slice_offset_ += read_size;
+      bytes_read += read_size;
+
+      // Should have either read all of the chunk or completed reading now.
+      PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() ||
+                      bytes_read == num_bytes);
+    }
+    return bytes_read;
+  }
+
+ private:
+  const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
+
+  // Iterator pointing to slice in |buffer_slices_| that we're currently reading
+  // from.
+  std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_;
+  // Read offset in the current slice in bytes.
+  size_t cur_slice_offset_ = 0;
+};
+
+}  // namespace
+
+StartupTraceWriter::StartupTraceWriter()
+    : memory_buffer_(new protozero::ScatteredHeapBuffer()),
+      memory_stream_writer_(
+          new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
+  memory_buffer_->set_writer(memory_stream_writer_.get());
+  PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_);
+}
+
+StartupTraceWriter::StartupTraceWriter(
+    std::unique_ptr<TraceWriter> trace_writer)
+    : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
+
+StartupTraceWriter::~StartupTraceWriter() = default;
+
+bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
+                                       BufferID target_buffer) {
+  std::lock_guard<std::mutex> lock(lock_);
+
+  PERFETTO_DCHECK(!trace_writer_);
+
+  // Can't bind while the writer thread is writing.
+  if (write_in_progress_)
+    return false;
+
+  // If there's a pending trace packet, it should have been completed by the
+  // writer thread before write_in_progress_ is reset.
+  if (cur_packet_) {
+    PERFETTO_DCHECK(cur_packet_->is_finalized());
+    cur_packet_.reset();
+  }
+  memory_stream_writer_.reset();
+
+  trace_writer_ = arbiter->CreateTraceWriter(target_buffer);
+
+  ChunkID next_chunk_id = CommitLocalBufferChunks(
+      arbiter, trace_writer_->writer_id(), target_buffer);
+  memory_buffer_.reset();
+
+  // The real TraceWriter should start writing at the subsequent chunk ID.
+  bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
+  PERFETTO_DCHECK(success);
+
+  return true;
+}
+
+TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() {
+  PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+
+  // Check if we are already bound without grabbing the lock. This is an
+  // optimization to avoid any locking in the common case where the proxy was
+  // bound some time ago.
+  if (PERFETTO_LIKELY(was_bound_)) {
+    PERFETTO_DCHECK(!cur_packet_);
+    PERFETTO_DCHECK(trace_writer_);
+    return trace_writer_->NewTracePacket();
+  }
+
+  // Now grab the lock and safely check whether we are still unbound.
+  {
+    std::lock_guard<std::mutex> lock(lock_);
+    if (trace_writer_) {
+      PERFETTO_DCHECK(!cur_packet_);
+      // Set the |was_bound_| flag to avoid locking in future calls to
+      // NewTracePacket().
+      was_bound_ = true;
+      return trace_writer_->NewTracePacket();
+    }
+    // Not bound. Make sure it stays this way until the TracePacketHandle goes
+    // out of scope by setting |write_in_progress_|.
+    PERFETTO_DCHECK(!write_in_progress_);
+    write_in_progress_ = true;
+  }
+
+  // Write to the local buffer.
+  if (cur_packet_) {
+    // If we hit this, the caller is calling NewTracePacket() without having
+    // finalized the previous packet.
+    PERFETTO_DCHECK(cur_packet_->is_finalized());
+  } else {
+    cur_packet_.reset(new protos::pbzero::TracePacket());
+  }
+  cur_packet_->Reset(memory_stream_writer_.get());
+  TraceWriter::TracePacketHandle handle(cur_packet_.get());
+  // |this| outlives the packet handle.
+  handle.set_finalization_listener(this);
+  return handle;
+}
+
+void StartupTraceWriter::Flush(std::function<void()> callback) {
+  PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+  // It's fine to check |was_bound_| instead of acquiring the lock because
+  // |trace_writer_| will only need flushing after the first trace packet was
+  // written to it and |was_bound_| is set.
+  if (PERFETTO_LIKELY(was_bound_)) {
+    PERFETTO_DCHECK(trace_writer_);
+    return trace_writer_->Flush(std::move(callback));
+  }
+
+  // Can't flush while unbound.
+  callback();
+}
+
+WriterID StartupTraceWriter::writer_id() const {
+  PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+  // We can't acquire the lock because this is a const method. So we'll only
+  // proxy to |trace_writer_| once we have written the first packet to it
+  // instead.
+  if (PERFETTO_LIKELY(was_bound_)) {
+    PERFETTO_DCHECK(trace_writer_);
+    return trace_writer_->writer_id();
+  }
+  return 0;
+}
+
+size_t StartupTraceWriter::used_buffer_size() {
+  PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+  if (PERFETTO_LIKELY(was_bound_))
+    return 0;
+
+  std::lock_guard<std::mutex> lock(lock_);
+  if (trace_writer_)
+    return 0;
+
+  size_t used_size = 0;
+  memory_buffer_->AdjustUsedSizeOfCurrentSlice();
+  for (const auto& slice : memory_buffer_->slices()) {
+    used_size += slice.GetUsedRange().size();
+  }
+  return used_size;
+}
+
+void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) {
+  PERFETTO_DCHECK(cur_packet_.get() == message);
+  PERFETTO_DCHECK(cur_packet_->is_finalized());
+  // Finalize() is a no-op because the packet is already finalized.
+  uint32_t packet_size = cur_packet_->Finalize();
+  packet_sizes_.push_back(packet_size);
+  total_payload_size += packet_size;
+
+  // Write is complete, reset the flag to allow binding.
+  std::lock_guard<std::mutex> lock(lock_);
+  PERFETTO_DCHECK(write_in_progress_);
+  write_in_progress_ = false;
+}
+
+ChunkID StartupTraceWriter::CommitLocalBufferChunks(
+    SharedMemoryArbiterImpl* arbiter,
+    WriterID writer_id,
+    BufferID target_buffer) {
+  // TODO(eseckler): Write and commit these chunks asynchronously. This would
+  // require that the service is informed of the missing initial chunks, e.g. by
+  // committing our first chunk here before the new trace writer has a chance to
+  // commit its first chunk. Otherwise the service wouldn't know to wait for our
+  // chunks.
+
+  if (packet_sizes_.empty() || !writer_id)
+    return 0;
+
+  LocalBufferReader local_buffer_reader(memory_buffer_.get());
+
+  ChunkID next_chunk_id = 0;
+  SharedMemoryABI::Chunk cur_chunk =
+      NewChunk(arbiter, writer_id, next_chunk_id++, false);
+
+  size_t max_payload_size = cur_chunk.payload_size();
+  size_t cur_payload_size = 0;
+  uint16_t cur_num_packets = 0;
+  size_t total_num_packets = packet_sizes_.size();
+  PatchList empty_patch_list;
+  for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) {
+    uint32_t packet_size = packet_sizes_[packet_idx];
+    uint32_t remaining_packet_size = packet_size;
+    ++cur_num_packets;
+    do {
+      uint32_t fragment_size = static_cast<uint32_t>(
+          std::min(static_cast<size_t>(remaining_packet_size),
+                   max_payload_size - cur_payload_size));
+      cur_payload_size += fragment_size;
+      remaining_packet_size -= fragment_size;
+
+      bool last_write =
+          packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
+
+      // Find num_packets that we should copy into current chunk and their
+      // payload_size.
+      bool write_chunk = cur_num_packets == ChunkHeader::Packets::kMaxCount ||
+                         cur_payload_size == max_payload_size || last_write;
+
+      if (write_chunk) {
+        // Write chunk payload.
+        local_buffer_reader.ReadBytes(&cur_chunk, cur_payload_size);
+
+        auto new_packet_count =
+            cur_chunk.IncreasePacketCountTo(cur_num_packets);
+        PERFETTO_DCHECK(new_packet_count == cur_num_packets);
+
+        bool is_fragmenting = remaining_packet_size > 0;
+        if (is_fragmenting)
+          cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+
+        arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
+                                      &empty_patch_list);
+
+        // Avoid creating a new chunk after the last write.
+        if (!last_write) {
+          cur_chunk =
+              NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting);
+          max_payload_size = cur_chunk.payload_size();
+          cur_payload_size = 0;
+          cur_num_packets = is_fragmenting ? 1 : 0;
+        } else {
+          PERFETTO_DCHECK(!is_fragmenting);
+        }
+      }
+    } while (remaining_packet_size > 0);
+  }
+
+  // The last chunk should have been returned.
+  PERFETTO_DCHECK(!cur_chunk.is_valid());
+
+  return next_chunk_id;
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/startup_trace_writer_unittest.cc b/src/tracing/core/startup_trace_writer_unittest.cc
new file mode 100644
index 0000000..7a6d973
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer_unittest.cc
@@ -0,0 +1,218 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/tracing/core/tracing_service.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+#include "src/tracing/test/fake_producer_endpoint.h"
+
+#include "perfetto/trace/test_event.pbzero.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+class StartupTraceWriterTest : public AlignedBufferTest {
+ public:
+  void SetUp() override {
+    SharedMemoryArbiterImpl::set_default_layout_for_testing(
+        SharedMemoryABI::PageLayout::kPageDiv4);
+    AlignedBufferTest::SetUp();
+    task_runner_.reset(new base::TestTaskRunner());
+    arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
+                                               &fake_producer_endpoint_,
+                                               task_runner_.get()));
+  }
+
+  void TearDown() override {
+    arbiter_.reset();
+    task_runner_.reset();
+  }
+
+  void WritePackets(StartupTraceWriter* writer, size_t packet_count) {
+    for (size_t i = 0; i < packet_count; i++) {
+      auto packet = writer->NewTracePacket();
+      packet->set_for_testing()->set_str("foo");
+    }
+  }
+
+  void VerifyPacketCount(size_t expected_count) {
+    SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+    size_t packets_count = 0;
+    ChunkID current_max_chunk_id = 0;
+    for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+      uint32_t page_layout = abi->GetPageLayout(page_idx);
+      size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+      for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+        auto chunk_state = abi->GetChunkState(page_idx, chunk_idx);
+        ASSERT_TRUE(chunk_state == SharedMemoryABI::kChunkFree ||
+                    chunk_state == SharedMemoryABI::kChunkComplete);
+        auto chunk = abi->TryAcquireChunkForReading(page_idx, chunk_idx);
+        if (!chunk.is_valid())
+          continue;
+
+        // Should only see new chunks with IDs larger than the previous read
+        // since our reads and writes are serialized.
+        ChunkID chunk_id = chunk.header()->chunk_id.load();
+        if (last_read_max_chunk_id_ != 0)
+          EXPECT_LT(last_read_max_chunk_id_, chunk_id);
+        current_max_chunk_id = std::max(current_max_chunk_id, chunk_id);
+
+        auto packets_header = chunk.header()->packets.load();
+        packets_count += packets_header.count;
+        if (packets_header.flags &
+            SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk) {
+          // Don't count fragmented packets twice.
+          packets_count--;
+        }
+        abi->ReleaseChunkAsFree(std::move(chunk));
+      }
+    }
+    last_read_max_chunk_id_ = current_max_chunk_id;
+    EXPECT_EQ(expected_count, packets_count);
+  }
+
+  FakeProducerEndpoint fake_producer_endpoint_;
+  std::unique_ptr<base::TestTaskRunner> task_runner_;
+  std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
+  std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
+
+  ChunkID last_read_max_chunk_id_ = 0;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+                        StartupTraceWriterTest,
+                        ::testing::ValuesIn(kPageSizes));
+
+TEST_P(StartupTraceWriterTest, CreateUnboundAndBind) {
+  // Create an unbound writer.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+  // Bind it right away without having written any data before.
+  const BufferID kBufId = 42;
+  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+  const size_t kNumPackets = 32;
+  WritePackets(writer.get(), kNumPackets);
+  // Finalizes the last packet and returns the chunk.
+  writer.reset();
+
+  VerifyPacketCount(kNumPackets);
+}
+
+TEST_P(StartupTraceWriterTest, CreateBound) {
+  // Create a bound writer immediately.
+  const BufferID kBufId = 42;
+  std::unique_ptr<StartupTraceWriter> writer(
+      new StartupTraceWriter(arbiter_->CreateTraceWriter(kBufId)));
+
+  const size_t kNumPackets = 32;
+  WritePackets(writer.get(), kNumPackets);
+  // Finalizes the last packet and returns the chunk.
+  writer.reset();
+
+  VerifyPacketCount(kNumPackets);
+}
+
+TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndDiscard) {
+  // Create an unbound writer.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+  const size_t kNumPackets = 32;
+  WritePackets(writer.get(), kNumPackets);
+
+  // Should discard the written data.
+  writer.reset();
+
+  VerifyPacketCount(0);
+}
+
+TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndBind) {
+  // Create an unbound writer.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+  const size_t kNumPackets = 32;
+  WritePackets(writer.get(), kNumPackets);
+
+  // Binding the writer should cause the previously written packets to be
+  // written to the SMB and committed.
+  const BufferID kBufId = 42;
+  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+  VerifyPacketCount(kNumPackets);
+
+  // Any further packets should be written to the SMB directly.
+  const size_t kNumAdditionalPackets = 16;
+  WritePackets(writer.get(), kNumAdditionalPackets);
+  // Finalizes the last packet and returns the chunk.
+  writer.reset();
+
+  VerifyPacketCount(kNumAdditionalPackets);
+}
+
+TEST_P(StartupTraceWriterTest, WriteMultipleChunksWhileUnboundAndBind) {
+  // Create an unbound writer.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+  // Write a single packet to determine its size in the buffer.
+  WritePackets(writer.get(), 1);
+  size_t packet_size = writer->used_buffer_size();
+
+  // Write at least 3 pages worth of packets.
+  const size_t kNumPackets = (page_size() * 3 + packet_size - 1) / packet_size;
+  WritePackets(writer.get(), kNumPackets);
+
+  // Binding the writer should cause the previously written packets to be
+  // written to the SMB and committed.
+  const BufferID kBufId = 42;
+  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+  VerifyPacketCount(kNumPackets + 1);
+
+  // Any further packets should be written to the SMB directly.
+  const size_t kNumAdditionalPackets = 16;
+  WritePackets(writer.get(), kNumAdditionalPackets);
+  // Finalizes the last packet and returns the chunk.
+  writer.reset();
+
+  VerifyPacketCount(kNumAdditionalPackets);
+}
+
+TEST_P(StartupTraceWriterTest, BindingWhileWritingFails) {
+  // Create an unbound writer.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+  const BufferID kBufId = 42;
+  {
+    // Begin a write by opening a TracePacket
+    auto packet = writer->NewTracePacket();
+
+    // Binding while writing should fail.
+    EXPECT_FALSE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  }
+
+  // Packet was completed, so binding should work now and emit the packet.
+  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  VerifyPacketCount(1);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index 840588d..fcc29bc 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -220,8 +220,19 @@
   return id_;
 }
 
-// Base class ctor/dtor definition.
+bool TraceWriterImpl::SetFirstChunkId(ChunkID chunk_id) {
+  if (next_chunk_id_ > 0)
+    return false;
+  next_chunk_id_ = chunk_id;
+  return true;
+}
+
+// Base class definitions.
 TraceWriter::TraceWriter() = default;
 TraceWriter::~TraceWriter() = default;
 
+bool TraceWriter::SetFirstChunkId(ChunkID) {
+  return false;
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.h b/src/tracing/core/trace_writer_impl.h
index 80a3ffe..9e0cdb4 100644
--- a/src/tracing/core/trace_writer_impl.h
+++ b/src/tracing/core/trace_writer_impl.h
@@ -40,6 +40,7 @@
   TracePacketHandle NewTracePacket() override;
   void Flush(std::function<void()> callback = {}) override;
   WriterID writer_id() const override;
+  bool SetFirstChunkId(ChunkID) override;
 
   void ResetChunkForTesting() { cur_chunk_ = SharedMemoryABI::Chunk(); }
 
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index bf56401..1194052 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -24,6 +24,7 @@
 #include "src/base/test/test_task_runner.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
 #include "src/tracing/test/aligned_buffer_test.h"
+#include "src/tracing/test/fake_producer_endpoint.h"
 
 #include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
@@ -31,26 +32,6 @@
 namespace perfetto {
 namespace {
 
-class FakeProducerEndpoint : public TracingService::ProducerEndpoint {
- public:
-  void RegisterDataSource(const DataSourceDescriptor&) override {}
-  void UnregisterDataSource(const std::string&) override {}
-  void RegisterTraceWriter(uint32_t, uint32_t) override {}
-  void UnregisterTraceWriter(uint32_t) override {}
-  void CommitData(const CommitDataRequest& req, CommitDataCallback) override {
-    last_commit_data_request = req;
-  }
-  void NotifyFlushComplete(FlushRequestID) override {}
-  void NotifyDataSourceStopped(DataSourceInstanceID) override {}
-  SharedMemory* shared_memory() const override { return nullptr; }
-  size_t shared_buffer_page_size_kb() const override { return 0; }
-  std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
-    return nullptr;
-  }
-
-  CommitDataRequest last_commit_data_request;
-};
-
 class TraceWriterImplTest : public AlignedBufferTest {
  public:
   void SetUp() override {
diff --git a/src/tracing/test/fake_producer_endpoint.h b/src/tracing/test/fake_producer_endpoint.h
new file mode 100644
index 0000000..478ffe8
--- /dev/null
+++ b/src/tracing/test/fake_producer_endpoint.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_
+#define SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_
+
+#include "perfetto/tracing/core/commit_data_request.h"
+#include "perfetto/tracing/core/tracing_service.h"
+
+namespace perfetto {
+
+class FakeProducerEndpoint : public TracingService::ProducerEndpoint {
+ public:
+  void RegisterDataSource(const DataSourceDescriptor&) override {}
+  void UnregisterDataSource(const std::string&) override {}
+  void RegisterTraceWriter(uint32_t, uint32_t) override {}
+  void UnregisterTraceWriter(uint32_t) override {}
+  void CommitData(const CommitDataRequest& req, CommitDataCallback) override {
+    last_commit_data_request = req;
+  }
+  void NotifyFlushComplete(FlushRequestID) override {}
+  void NotifyDataSourceStopped(DataSourceInstanceID) override {}
+  SharedMemory* shared_memory() const override { return nullptr; }
+  size_t shared_buffer_page_size_kb() const override { return 0; }
+  std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
+    return nullptr;
+  }
+
+  CommitDataRequest last_commit_data_request;
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_