trace writer: Add StartupTraceWriterRegistry.

Adds a registry for StartupTraceWriters that keeps track of any unbound
writers and binds them once the SMB is available.

Also fixes a few bugs in the StartupTraceWriter implementation.

Bug: 122507010
Change-Id: Id8de101d9aa4132467d67be2ded3d30b6b8fd52a
diff --git a/Android.bp b/Android.bp
index 8de77cf..f3df9e5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -89,6 +89,7 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -281,6 +282,7 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -426,6 +428,7 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -624,6 +627,7 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -2348,6 +2352,7 @@
     "src/tracing/core/shared_memory_arbiter_impl.cc",
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
     "src/tracing/core/trace_buffer.cc",
@@ -2681,6 +2686,7 @@
     "src/tracing/core/sliced_protobuf_input_stream.cc",
     "src/tracing/core/sliced_protobuf_input_stream_unittest.cc",
     "src/tracing/core/startup_trace_writer.cc",
+    "src/tracing/core/startup_trace_writer_registry.cc",
     "src/tracing/core/startup_trace_writer_unittest.cc",
     "src/tracing/core/sys_stats_config.cc",
     "src/tracing/core/test_config.cc",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index c4e7eed..d9e375c 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -28,6 +28,7 @@
     "shared_memory_arbiter.h",
     "slice.h",
     "startup_trace_writer.h",
+    "startup_trace_writer_registry.h",
     "trace_config.h",
     "trace_packet.h",
     "trace_stats.h",
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index e285648..5dc4c03 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -35,6 +35,7 @@
 
 class CommitDataRequest;
 class StartupTraceWriter;
+class StartupTraceWriterRegistry;
 class SharedMemory;
 class TraceWriter;
 
@@ -51,14 +52,24 @@
   virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) = 0;
 
-  // Binds the provided unbound StartupTraceWriter to a new TraceWriter
-  // associated with the arbiter's SMB. Returns |false| if binding failed
-  // because the writer is concurrently writing data to its temporary buffer. In
-  // this case, the caller should retry (it is free to try again immediately or
-  // schedule a wakeup to retry later).
-  virtual bool BindStartupTraceWriter(StartupTraceWriter* writer,
-                                      BufferID target_buffer)
-      PERFETTO_WARN_UNUSED_RESULT = 0;
+  // Binds the provided unbound StartupTraceWriterRegistry to the arbiter's SMB.
+  // Normally this happens when the perfetto service has been initialized and we
+  // want to rebind all the writers created in the early startup phase.
+  //
+  // All StartupTraceWriters created by the registry are bound to the arbiter
+  // and the given target buffer. The writers may not be bound immediately if
+  // they are concurrently being written to. The registry will retry on the
+  // arbiter's TaskRunner until all writers were bound successfully.
+  //
+  // Should only be called on the passed TaskRunner's sequence. By calling this
+  // method, the registry's ownership is transferred to the arbiter. The arbiter
+  // will delete the registry once all writers were bound.
+  //
+  // TODO(eseckler): Make target buffer assignment more flexible (i.e. per
+  // writer). For now, embedders can use multiple registries instead.
+  virtual void BindStartupTraceWriterRegistry(
+      std::unique_ptr<StartupTraceWriterRegistry>,
+      BufferID target_buffer) = 0;
 
   // Notifies the service that all data for the given FlushRequestID has been
   // committed in the shared memory buffer.
diff --git a/include/perfetto/tracing/core/startup_trace_writer.h b/include/perfetto/tracing/core/startup_trace_writer.h
index b93e2f1..d1ad82c 100644
--- a/include/perfetto/tracing/core/startup_trace_writer.h
+++ b/include/perfetto/tracing/core/startup_trace_writer.h
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <mutex>
+#include <set>
 #include <vector>
 
 #include "perfetto/base/export.h"
@@ -33,6 +34,7 @@
 namespace perfetto {
 
 class SharedMemoryArbiterImpl;
+class StartupTraceWriterRegistryHandle;
 
 namespace protos {
 namespace pbzero {
@@ -44,8 +46,9 @@
 // when the perfetto service is not available yet.
 //
 // Until the service is available, producer threads instantiate an unbound
-// StartupTraceWriter instance and use it to emit trace events. Each writer will
-// record the serialized trace events into a temporary local memory buffer.
+// StartupTraceWriter instance (via a StartupTraceWriterRegistry) and use it to
+// emit trace events. Each writer will record the serialized trace events into a
+// temporary local memory buffer.
 //
 // Once the service is available, the producer binds each StartupTraceWriter to
 // the SMB by calling SharedMemoryArbiter::BindStartupTraceWriter(). The data in
@@ -66,10 +69,6 @@
     : public TraceWriter,
       public protozero::MessageHandleBase::FinalizationListener {
  public:
-  // Create an unbound StartupTraceWriter that can later be bound by calling
-  // BindToTraceWriter().
-  StartupTraceWriter();
-
   // Create a StartupTraceWriter bound to |trace_writer|. Should only be called
   // on the writer thread.
   explicit StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer);
@@ -87,19 +86,6 @@
 
   uint64_t written() const override;
 
-  // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
-  // Called by SharedMemoryArbiterImpl::BindStartupTraceWriter().
-  //
-  // This method can be called on any thread. If any data was written locally
-  // before the writer was bound, BindToArbiter() will copy this data into
-  // chunks in the provided target buffer via the SMB. Any future packets will
-  // be directly written into the SMB via a newly obtained TraceWriter from the
-  // arbiter.
-  //
-  // Will fail and return |false| if a concurrent write is in progress.
-  bool BindToArbiter(SharedMemoryArbiterImpl*,
-                     BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
-
   // Returns |true| if the writer thread has observed that the writer was bound
   // to an SMB. Should only be called on the writer thread.
   //
@@ -116,6 +102,31 @@
   size_t used_buffer_size();
 
  private:
+  friend class StartupTraceWriterRegistry;
+  friend class StartupTraceWriterTest;
+
+  // Create an unbound StartupTraceWriter associated with the registry pointed
+  // to by the handle. The writer can later be bound by calling
+  // BindToTraceWriter(). The registry handle may be nullptr in tests.
+  StartupTraceWriter(std::shared_ptr<StartupTraceWriterRegistryHandle>);
+
+  StartupTraceWriter(const StartupTraceWriter&) = delete;
+  StartupTraceWriter& operator=(const StartupTraceWriter&) = delete;
+
+  // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
+  // Called by StartupTraceWriterRegistry::BindToArbiter().
+  //
+  // This method can be called on any thread. If any data was written locally
+  // before the writer was bound, BindToArbiter() will copy this data into
+  // chunks in the provided target buffer via the SMB. Any future packets will
+  // be directly written into the SMB via a newly obtained TraceWriter from the
+  // arbiter.
+  //
+  // Will fail and return |false| if a concurrent write is in progress. Returns
+  // |true| if successfully bound and should then not be called again.
+  bool BindToArbiter(SharedMemoryArbiterImpl*,
+                     BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
+
   // protozero::MessageHandleBase::FinalizationListener implementation.
   void OnMessageFinalized(protozero::Message* message) override;
 
@@ -124,6 +135,8 @@
 
   PERFETTO_THREAD_CHECKER(writer_thread_checker_)
 
+  std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle_;
+
   // Only set and accessed from the writer thread. The writer thread flips this
   // bit when it sees that trace_writer_ is set (while holding the lock).
   // Caching this fact in this variable avoids the need to acquire the lock to
@@ -141,7 +154,6 @@
   std::unique_ptr<protozero::ScatteredStreamWriter> memory_stream_writer_;
 
   std::vector<uint32_t> packet_sizes_;
-  size_t total_payload_size = 0;
 
   // Whether the writer thread is currently writing a TracePacket.
   bool write_in_progress_ = false;
diff --git a/include/perfetto/tracing/core/startup_trace_writer_registry.h b/include/perfetto/tracing/core/startup_trace_writer_registry.h
new file mode 100644
index 0000000..db19862
--- /dev/null
+++ b/include/perfetto/tracing/core/startup_trace_writer_registry.h
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/tracing/core/basic_types.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiterImpl;
+class StartupTraceWriter;
+class StartupTraceWriterRegistry;
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+// Notifies the registry about the destruction of a StartupTraceWriter, provided
+// the registry itself wasn't deleted yet. The indirection via the handle is
+// necessary to avoid potential deadlocks caused by lock order inversion. These
+// issues are avoided by locking on the handle's common lock in the destructors
+// of the registry and writer.
+class StartupTraceWriterRegistryHandle {
+ public:
+  explicit StartupTraceWriterRegistryHandle(StartupTraceWriterRegistry*);
+
+  // Called by StartupTraceWriter destructor.
+  void OnWriterDestroyed(StartupTraceWriter*);
+
+  // Called by StartupTraceWriterRegistry destructor.
+  void OnRegistryDestroyed();
+
+ private:
+  StartupTraceWriterRegistryHandle(const StartupTraceWriterRegistryHandle&) =
+      delete;
+  StartupTraceWriterRegistryHandle& operator=(
+      const StartupTraceWriterRegistryHandle&) = delete;
+
+  std::mutex lock_;
+  StartupTraceWriterRegistry* registry_;
+};
+
+// Embedders can use this registry to create unbound StartupTraceWriters during
+// startup, and later bind them all safely to an arbiter and target buffer.
+class PERFETTO_EXPORT StartupTraceWriterRegistry {
+ public:
+  StartupTraceWriterRegistry();
+  ~StartupTraceWriterRegistry();
+
+  // Returns a new unbound StartupTraceWriter. Should only be called while
+  // unbound. Usually called on a writer thread.
+  std::unique_ptr<StartupTraceWriter> CreateUnboundTraceWriter();
+
+  // Return an unbound StartupTraceWriter back to the registry before it could
+  // be bound (usually called when the writer's thread is destroyed). The
+  // registry will keep this writer alive until the registry is bound to an
+  // arbiter (or destroyed itself). This way, its buffered data is retained.
+  // Should only be called while unbound. All packets written to the passed
+  // writer should have been completed and it should no longer be used to write
+  // data after calling this method.
+  void ReturnUnboundTraceWriter(std::unique_ptr<StartupTraceWriter>);
+
+  // Binds all StartupTraceWriters created by this registry to the given arbiter
+  // and target buffer. Should only be called once and on the passed
+  // TaskRunner's sequence. See
+  // SharedMemoryArbiter::BindStartupTraceWriterRegistry() for details.
+  //
+  // Note that the writers may not be bound synchronously if they are
+  // concurrently being written to. The registry will retry on the passed
+  // TaskRunner until all writers were bound successfully.
+  //
+  // Calls |on_bound_callback| asynchronously on |trace_writer| once all writers
+  // were bound.
+  void BindToArbiter(
+      SharedMemoryArbiterImpl*,
+      BufferID target_buffer,
+      base::TaskRunner*,
+      std::function<void(StartupTraceWriterRegistry*)> on_bound_callback);
+
+ private:
+  friend class StartupTraceWriterRegistryHandle;
+  friend class StartupTraceWriterTest;
+
+  StartupTraceWriterRegistry(const StartupTraceWriterRegistry&) = delete;
+  StartupTraceWriterRegistry& operator=(const StartupTraceWriterRegistry&) =
+      delete;
+
+  // Called by StartupTraceWriterRegistryHandle.
+  void OnStartupTraceWriterDestroyed(StartupTraceWriter*);
+
+  // Try to bind the remaining unbound writers and post a continuation to
+  // |task_runner_| if any writers could not be bound.
+  void TryBindWriters();
+
+  // Notifies the arbiter when we have bound all writers. May delete |this|.
+  void OnUnboundWritersRemovedLocked();
+
+  std::shared_ptr<StartupTraceWriterRegistryHandle> handle_;
+
+  // Begin lock-protected members.
+  std::mutex lock_;
+
+  // Unbound writers that we handed out to writer threads. These writers may be
+  // concurrently written to by the writer threads.
+  std::set<StartupTraceWriter*> unbound_writers_;
+
+  // Unbound writers that writer threads returned to the registry by calling
+  // ReturnUnboundTraceWriter(). Writers are removed from |unbound_writers_|
+  // when they are added to |unbound_owned_writers_|. No new data can be written
+  // to these writers.
+  std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers_;
+
+  SharedMemoryArbiterImpl* arbiter_ = nullptr;  // |nullptr| while unbound.
+  BufferID target_buffer_ = 0;
+  base::TaskRunner* task_runner_;
+  std::function<void(StartupTraceWriterRegistry*)> on_bound_callback_ = nullptr;
+
+  // Keep at the end. Initialized during |BindToArbiter()|, like |task_runner_|.
+  // Weak pointers are only valid on |task_runner_|'s thread/sequence.
+  std::unique_ptr<base::WeakPtrFactory<StartupTraceWriterRegistry>>
+      weak_ptr_factory_;
+  // End lock-protected members.
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 4cf124e..06969cc 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -55,6 +55,7 @@
     "core/sliced_protobuf_input_stream.cc",
     "core/sliced_protobuf_input_stream.h",
     "core/startup_trace_writer.cc",
+    "core/startup_trace_writer_registry.cc",
     "core/sys_stats_config.cc",
     "core/test_config.cc",
     "core/trace_buffer.cc",
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index e4d0e2d..9a3f2d6 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -21,7 +21,7 @@
 #include "perfetto/base/time.h"
 #include "perfetto/tracing/core/commit_data_request.h"
 #include "perfetto/tracing/core/shared_memory.h"
-#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/null_trace_writer.h"
 #include "src/tracing/core/trace_writer_impl.h"
 
@@ -276,9 +276,35 @@
       new TraceWriterImpl(this, id, target_buffer));
 }
 
-bool SharedMemoryArbiterImpl::BindStartupTraceWriter(StartupTraceWriter* writer,
-                                                     BufferID target_buffer) {
-  return writer->BindToArbiter(this, target_buffer);
+void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry(
+    std::unique_ptr<StartupTraceWriterRegistry> registry,
+    BufferID target_buffer) {
+  // The registry will be owned by the arbiter, so it's safe to capture |this|
+  // in the callback.
+  auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) {
+    std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete;
+    {
+      std::lock_guard<std::mutex> scoped_lock(lock_);
+
+      for (auto it = startup_trace_writer_registries_.begin();
+           it != startup_trace_writer_registries_.end(); it++) {
+        if (it->get() == bound_registry) {
+          // We can't delete the registry while the arbiter's lock is held
+          // (to avoid lock inversion).
+          registry_to_delete = std::move(*it);
+          startup_trace_writer_registries_.erase(it);
+          break;
+        }
+      }
+    }
+
+    // The registry should have been in |startup_trace_writer_registries_|.
+    PERFETTO_DCHECK(registry_to_delete);
+    registry_to_delete.reset();
+  };
+  registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback);
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+  startup_trace_writer_registries_.push_back(std::move(registry));
 }
 
 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 0ef8abe..0707bcf 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -29,6 +29,7 @@
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
 #include "perfetto/tracing/core/shared_memory_arbiter.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/id_allocator.h"
 
 namespace perfetto {
@@ -105,13 +106,15 @@
   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
   std::unique_ptr<TraceWriter> CreateTraceWriter(
       BufferID target_buffer) override;
-  bool BindStartupTraceWriter(StartupTraceWriter* writer,
-                              BufferID target_buffer) override;
+  void BindStartupTraceWriterRegistry(
+      std::unique_ptr<StartupTraceWriterRegistry>,
+      BufferID target_buffer) override;
 
   void NotifyFlushComplete(FlushRequestID) override;
 
  private:
   friend class TraceWriterImpl;
+  friend class StartupTraceWriterTest;
 
   static SharedMemoryABI::PageLayout default_page_layout;
 
@@ -137,6 +140,10 @@
   std::unique_ptr<CommitDataRequest> commit_data_req_;
   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
   IdAllocator<WriterID> active_writer_ids_;
+  // Registries whose Bind() is in progress. We destroy each registry when their
+  // Bind() is complete or when the arbiter is destroyed itself.
+  std::vector<std::unique_ptr<StartupTraceWriterRegistry>>
+      startup_trace_writer_registries_;
   // --- End lock-protected members ---
 
   // Keep at the end.
diff --git a/src/tracing/core/startup_trace_writer.cc b/src/tracing/core/startup_trace_writer.cc
index eb1d320..f4f7f05 100644
--- a/src/tracing/core/startup_trace_writer.cc
+++ b/src/tracing/core/startup_trace_writer.cc
@@ -16,9 +16,13 @@
 
 #include "perfetto/tracing/core/startup_trace_writer.h"
 
+#include <numeric>
+
 #include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
 #include "src/tracing/core/patch_list.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
 
@@ -54,9 +58,12 @@
   LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
       : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
 
-  size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, size_t num_bytes) {
-    PERFETTO_CHECK(target_chunk->payload_size() >= num_bytes);
-    uint8_t* chunk_payload = target_chunk->payload_begin();
+  size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
+                   size_t num_bytes,
+                   size_t cur_payload_size) {
+    PERFETTO_CHECK(target_chunk->payload_size() >=
+                   num_bytes + cur_payload_size);
+    uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
     size_t bytes_read = 0;
     while (bytes_read < num_bytes) {
       if (cur_slice_ == buffer_slices_.end())
@@ -72,8 +79,8 @@
 
       size_t read_size = std::min(num_bytes - bytes_read,
                                   cur_slice_range.size() - cur_slice_offset_);
-      memcpy(chunk_payload + bytes_read,
-             cur_slice_range.begin + cur_slice_offset_, read_size);
+      memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
+             read_size);
       cur_slice_offset_ += read_size;
       bytes_read += read_size;
 
@@ -84,6 +91,23 @@
     return bytes_read;
   }
 
+  size_t TotalUsedSize() const {
+    size_t used_size = 0;
+    for (const auto& slice : buffer_slices_) {
+      used_size += slice.GetUsedRange().size();
+    }
+    return used_size;
+  }
+
+  bool DidReadAllData() const {
+    if (cur_slice_ == buffer_slices_.end())
+      return true;
+
+    const auto next_slice = cur_slice_ + 1;
+    return next_slice == buffer_slices_.end() &&
+           cur_slice_->GetUsedRange().size() == cur_slice_offset_;
+  }
+
  private:
   const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
 
@@ -96,8 +120,10 @@
 
 }  // namespace
 
-StartupTraceWriter::StartupTraceWriter()
-    : memory_buffer_(new protozero::ScatteredHeapBuffer()),
+StartupTraceWriter::StartupTraceWriter(
+    std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
+    : registry_handle_(std::move(registry_handle)),
+      memory_buffer_(new protozero::ScatteredHeapBuffer()),
       memory_stream_writer_(
           new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
   memory_buffer_->set_writer(memory_stream_writer_.get());
@@ -108,7 +134,10 @@
     std::unique_ptr<TraceWriter> trace_writer)
     : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
 
-StartupTraceWriter::~StartupTraceWriter() = default;
+StartupTraceWriter::~StartupTraceWriter() {
+  if (registry_handle_)
+    registry_handle_->OnWriterDestroyed(this);
+}
 
 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
                                        BufferID target_buffer) {
@@ -126,18 +155,18 @@
     PERFETTO_DCHECK(cur_packet_->is_finalized());
     cur_packet_.reset();
   }
-  memory_stream_writer_.reset();
 
   trace_writer_ = arbiter->CreateTraceWriter(target_buffer);
-
   ChunkID next_chunk_id = CommitLocalBufferChunks(
       arbiter, trace_writer_->writer_id(), target_buffer);
-  memory_buffer_.reset();
 
   // The real TraceWriter should start writing at the subsequent chunk ID.
   bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
   PERFETTO_DCHECK(success);
 
+  memory_stream_writer_.reset();
+  memory_buffer_.reset();
+
   return true;
 }
 
@@ -195,7 +224,8 @@
   }
 
   // Can't flush while unbound.
-  callback();
+  if (callback)
+    callback();
 }
 
 WriterID StartupTraceWriter::writer_id() const {
@@ -245,7 +275,6 @@
   // Finalize() is a no-op because the packet is already finalized.
   uint32_t packet_size = cur_packet_->Finalize();
   packet_sizes_.push_back(packet_size);
-  total_payload_size += packet_size;
 
   // Write is complete, reset the flag to allow binding.
   std::lock_guard<std::mutex> lock(lock_);
@@ -266,8 +295,13 @@
   if (packet_sizes_.empty() || !writer_id)
     return 0;
 
+  memory_buffer_->AdjustUsedSizeOfCurrentSlice();
   LocalBufferReader local_buffer_reader(memory_buffer_.get());
 
+  PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
+                  std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
+                                  static_cast<size_t>(0u)));
+
   ChunkID next_chunk_id = 0;
   SharedMemoryABI::Chunk cur_chunk =
       NewChunk(arbiter, writer_id, next_chunk_id++, false);
@@ -284,29 +318,41 @@
     do {
       uint32_t fragment_size = static_cast<uint32_t>(
           std::min(static_cast<size_t>(remaining_packet_size),
-                   max_payload_size - cur_payload_size));
+                   max_payload_size - cur_payload_size -
+                       SharedMemoryABI::kPacketHeaderSize));
+      // Write packet header, i.e. the fragment size.
+      protozero::proto_utils::WriteRedundantVarInt(
+          fragment_size, cur_chunk.payload_begin() + cur_payload_size);
+      cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
+
+      // Copy packet content into the chunk.
+      size_t bytes_read = local_buffer_reader.ReadBytes(
+          &cur_chunk, fragment_size, cur_payload_size);
+      PERFETTO_DCHECK(bytes_read == fragment_size);
+
       cur_payload_size += fragment_size;
       remaining_packet_size -= fragment_size;
 
       bool last_write =
           packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
 
-      // Find num_packets that we should copy into current chunk and their
-      // payload_size.
-      bool write_chunk = cur_num_packets == ChunkHeader::Packets::kMaxCount ||
-                         cur_payload_size == max_payload_size || last_write;
+      // We should return the current chunk if we've filled its payload, reached
+      // the maximum number of packets, or wrote everything we wanted to.
+      bool return_chunk =
+          cur_payload_size >=
+              max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
+          cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
 
-      if (write_chunk) {
-        // Write chunk payload.
-        local_buffer_reader.ReadBytes(&cur_chunk, cur_payload_size);
-
+      if (return_chunk) {
         auto new_packet_count =
             cur_chunk.IncreasePacketCountTo(cur_num_packets);
         PERFETTO_DCHECK(new_packet_count == cur_num_packets);
 
         bool is_fragmenting = remaining_packet_size > 0;
-        if (is_fragmenting)
+        if (is_fragmenting) {
+          PERFETTO_DCHECK(cur_payload_size == max_payload_size);
           cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+        }
 
         arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
                                       &empty_patch_list);
@@ -327,6 +373,8 @@
 
   // The last chunk should have been returned.
   PERFETTO_DCHECK(!cur_chunk.is_valid());
+  // We should have read all data from the local buffer.
+  PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
 
   return next_chunk_id;
 }
diff --git a/src/tracing/core/startup_trace_writer_registry.cc b/src/tracing/core/startup_trace_writer_registry.cc
new file mode 100644
index 0000000..6de4ab0
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer_registry.cc
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+
+#include <functional>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+StartupTraceWriterRegistryHandle::StartupTraceWriterRegistryHandle(
+    StartupTraceWriterRegistry* registry)
+    : registry_(registry) {}
+
+void StartupTraceWriterRegistryHandle::OnWriterDestroyed(
+    StartupTraceWriter* writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  if (registry_)
+    registry_->OnStartupTraceWriterDestroyed(writer);
+}
+
+void StartupTraceWriterRegistryHandle::OnRegistryDestroyed() {
+  std::lock_guard<std::mutex> lock(lock_);
+  registry_ = nullptr;
+}
+
+StartupTraceWriterRegistry::StartupTraceWriterRegistry()
+    : handle_(std::make_shared<StartupTraceWriterRegistryHandle>(this)) {}
+
+StartupTraceWriterRegistry::~StartupTraceWriterRegistry() {
+  handle_->OnRegistryDestroyed();
+}
+
+std::unique_ptr<StartupTraceWriter>
+StartupTraceWriterRegistry::CreateUnboundTraceWriter() {
+  std::lock_guard<std::mutex> lock(lock_);
+  PERFETTO_DCHECK(!arbiter_);  // Should only be called while unbound.
+  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter(handle_));
+  unbound_writers_.insert(writer.get());
+  return writer;
+}
+
+void StartupTraceWriterRegistry::ReturnUnboundTraceWriter(
+    std::unique_ptr<StartupTraceWriter> trace_writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  PERFETTO_DCHECK(!arbiter_);  // Should only be called while unbound.
+  PERFETTO_DCHECK(!trace_writer->write_in_progress_);
+  PERFETTO_DCHECK(unbound_writers_.count(trace_writer.get()));
+  unbound_writers_.erase(trace_writer.get());
+  unbound_owned_writers_.push_back(std::move(trace_writer));
+}
+
+void StartupTraceWriterRegistry::BindToArbiter(
+    SharedMemoryArbiterImpl* arbiter,
+    BufferID target_buffer,
+    base::TaskRunner* task_runner,
+    std::function<void(StartupTraceWriterRegistry*)> on_bound_callback) {
+  std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers;
+  {
+    std::lock_guard<std::mutex> lock(lock_);
+    PERFETTO_DCHECK(!arbiter_);
+    arbiter_ = arbiter;
+    target_buffer_ = target_buffer;
+    task_runner_ = task_runner;
+    weak_ptr_factory_.reset(
+        new base::WeakPtrFactory<StartupTraceWriterRegistry>(this));
+    on_bound_callback_ = std::move(on_bound_callback);
+    // We can't destroy the writers while holding |lock_|, so we swap them out
+    // here instead. After we are bound, no more writers can be added to the
+    // list.
+    unbound_owned_writers.swap(unbound_owned_writers_);
+  }
+
+  // Bind and destroy the owned writers.
+  for (const auto& writer : unbound_owned_writers) {
+    // This should succeed since nobody can write to these writers concurrently.
+    bool success = writer->BindToArbiter(arbiter_, target_buffer_);
+    PERFETTO_DCHECK(success);
+  }
+  unbound_owned_writers.clear();
+
+  TryBindWriters();
+}
+
+void StartupTraceWriterRegistry::TryBindWriters() {
+  std::lock_guard<std::mutex> lock(lock_);
+  for (auto it = unbound_writers_.begin(); it != unbound_writers_.end();) {
+    if ((*it)->BindToArbiter(arbiter_, target_buffer_)) {
+      it = unbound_writers_.erase(it);
+    } else {
+      it++;
+    }
+  }
+  if (!unbound_writers_.empty()) {
+    auto weak_this = weak_ptr_factory_->GetWeakPtr();
+    task_runner_->PostTask([weak_this] {
+      if (weak_this)
+        weak_this->TryBindWriters();
+    });
+  }
+  OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnStartupTraceWriterDestroyed(
+    StartupTraceWriter* trace_writer) {
+  std::lock_guard<std::mutex> lock(lock_);
+  if (unbound_writers_.erase(trace_writer) > 0)
+    OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnUnboundWritersRemovedLocked() {
+  if (!unbound_writers_.empty() || !task_runner_ || !on_bound_callback_)
+    return;
+
+  PERFETTO_DCHECK(weak_ptr_factory_);
+  auto weak_this = weak_ptr_factory_->GetWeakPtr();
+  // Run callback in PostTask() since the callback may delete |this| and thus
+  // might otherwise cause a deadlock.
+  auto callback = on_bound_callback_;
+  on_bound_callback_ = nullptr;
+  task_runner_->PostTask([weak_this, callback]() {
+    if (!weak_this)
+      return;
+    // Note: callback may delete |this|.
+    callback(weak_this.get());
+  });
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/startup_trace_writer_unittest.cc b/src/tracing/core/startup_trace_writer_unittest.cc
index fd9ea45..2566455 100644
--- a/src/tracing/core/startup_trace_writer_unittest.cc
+++ b/src/tracing/core/startup_trace_writer_unittest.cc
@@ -17,17 +17,21 @@
 #include "perfetto/tracing/core/startup_trace_writer.h"
 
 #include "gtest/gtest.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+#include "perfetto/tracing/core/trace_packet.h"
 #include "perfetto/tracing/core/tracing_service.h"
 #include "src/base/test/test_task_runner.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
+#include "src/tracing/core/sliced_protobuf_input_stream.h"
+#include "src/tracing/core/trace_buffer.h"
 #include "src/tracing/test/aligned_buffer_test.h"
 #include "src/tracing/test/fake_producer_endpoint.h"
 
 #include "perfetto/trace/test_event.pbzero.h"
+#include "perfetto/trace/trace_packet.pb.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
-namespace {
 
 class StartupTraceWriterTest : public AlignedBufferTest {
  public:
@@ -46,16 +50,29 @@
     task_runner_.reset();
   }
 
+  std::unique_ptr<StartupTraceWriter> CreateUnboundWriter() {
+    std::shared_ptr<StartupTraceWriterRegistryHandle> registry;
+    return std::unique_ptr<StartupTraceWriter>(
+        new StartupTraceWriter(registry));
+  }
+
+  bool BindWriter(StartupTraceWriter* writer) {
+    const BufferID kBufId = 42;
+    return writer->BindToArbiter(arbiter_.get(), kBufId);
+  }
+
   void WritePackets(StartupTraceWriter* writer, size_t packet_count) {
     for (size_t i = 0; i < packet_count; i++) {
       auto packet = writer->NewTracePacket();
-      packet->set_for_testing()->set_str("foo");
+      packet->set_for_testing()->set_str(kPacketPayload);
     }
   }
 
-  void VerifyPacketCount(size_t expected_count) {
+  void VerifyPackets(size_t expected_count) {
     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
-    size_t packets_count = 0;
+    auto buffer = TraceBuffer::Create(abi->size());
+
+    size_t total_packets_count = 0;
     ChunkID current_max_chunk_id = 0;
     for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
       uint32_t page_layout = abi->GetPageLayout(page_idx);
@@ -77,19 +94,74 @@
         current_max_chunk_id = std::max(current_max_chunk_id, chunk_id);
 
         auto packets_header = chunk.header()->packets.load();
-        packets_count += packets_header.count;
+        total_packets_count += packets_header.count;
         if (packets_header.flags &
             SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk) {
           // Don't count fragmented packets twice.
-          packets_count--;
+          total_packets_count--;
         }
+
+        buffer->CopyChunkUntrusted(
+            /*producer_id_trusted=*/1, /*producer_uid_trusted=*/1,
+            chunk.header()->writer_id.load(), chunk_id, packets_header.count,
+            packets_header.flags, /*chunk_complete=*/true,
+            chunk.payload_begin(), chunk.payload_size());
         abi->ReleaseChunkAsFree(std::move(chunk));
       }
     }
     last_read_max_chunk_id_ = current_max_chunk_id;
-    EXPECT_EQ(expected_count, packets_count);
+    EXPECT_EQ(expected_count, total_packets_count);
+
+    // Now verify chunk and packet contents.
+    buffer->BeginRead();
+    size_t num_packets_read = 0;
+    while (true) {
+      TracePacket packet;
+      uid_t producer_uid = kInvalidUid;
+      if (!buffer->ReadNextTracePacket(&packet, &producer_uid))
+        break;
+      EXPECT_EQ(static_cast<uid_t>(1), producer_uid);
+
+      SlicedProtobufInputStream stream(&packet.slices());
+      size_t size = 0;
+      for (const Slice& slice : packet.slices())
+        size += slice.size;
+      protos::TracePacket parsed_packet;
+      bool success = parsed_packet.ParseFromBoundedZeroCopyStream(
+          &stream, static_cast<int>(size));
+      EXPECT_TRUE(success);
+      if (!success)
+        break;
+      EXPECT_TRUE(parsed_packet.has_for_testing());
+      EXPECT_EQ(kPacketPayload, parsed_packet.for_testing().str());
+      num_packets_read++;
+    }
+    EXPECT_EQ(expected_count, num_packets_read);
   }
 
+  size_t GetUnboundWriterCount(
+      const StartupTraceWriterRegistry& registry) const {
+    return registry.unbound_writers_.size() +
+           registry.unbound_owned_writers_.size();
+  }
+
+  size_t GetBindingRegistriesCount(
+      const SharedMemoryArbiterImpl& arbiter) const {
+    return arbiter.startup_trace_writer_registries_.size();
+  }
+
+  size_t GetUnboundWriterCount(const SharedMemoryArbiterImpl& arbiter) const {
+    size_t count = 0u;
+    for (const auto& reg : arbiter.startup_trace_writer_registries_) {
+      count += reg->unbound_writers_.size();
+      count += reg->unbound_owned_writers_.size();
+    }
+    return count;
+  }
+
+ protected:
+  static constexpr char kPacketPayload[] = "foo";
+
   FakeProducerEndpoint fake_producer_endpoint_;
   std::unique_ptr<base::TestTaskRunner> task_runner_;
   std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
@@ -98,25 +170,27 @@
   ChunkID last_read_max_chunk_id_ = 0;
 };
 
+constexpr char StartupTraceWriterTest::kPacketPayload[];
+
+namespace {
+
 size_t const kPageSizes[] = {4096, 65536};
 INSTANTIATE_TEST_CASE_P(PageSize,
                         StartupTraceWriterTest,
                         ::testing::ValuesIn(kPageSizes));
 
 TEST_P(StartupTraceWriterTest, CreateUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
-  // Bind it right away without having written any data before.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  // Bind writer right away without having written any data before.
+  EXPECT_TRUE(BindWriter(writer.get()));
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 }
 
 TEST_P(StartupTraceWriterTest, CreateBound) {
@@ -130,12 +204,11 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 }
 
 TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndDiscard) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
@@ -143,22 +216,20 @@
   // Should discard the written data.
   writer.reset();
 
-  VerifyPacketCount(0);
+  VerifyPackets(0);
 }
 
 TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   const size_t kNumPackets = 32;
   WritePackets(writer.get(), kNumPackets);
 
   // Binding the writer should cause the previously written packets to be
   // written to the SMB and committed.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  EXPECT_TRUE(BindWriter(writer.get()));
 
-  VerifyPacketCount(kNumPackets);
+  VerifyPackets(kNumPackets);
 
   // Any further packets should be written to the SMB directly.
   const size_t kNumAdditionalPackets = 16;
@@ -166,12 +237,11 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumAdditionalPackets);
+  VerifyPackets(kNumAdditionalPackets);
 }
 
 TEST_P(StartupTraceWriterTest, WriteMultipleChunksWhileUnboundAndBind) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
   // Write a single packet to determine its size in the buffer.
   WritePackets(writer.get(), 1);
@@ -183,10 +253,9 @@
 
   // Binding the writer should cause the previously written packets to be
   // written to the SMB and committed.
-  const BufferID kBufId = 42;
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+  EXPECT_TRUE(BindWriter(writer.get()));
 
-  VerifyPacketCount(kNumPackets + 1);
+  VerifyPackets(kNumPackets + 1);
 
   // Any further packets should be written to the SMB directly.
   const size_t kNumAdditionalPackets = 16;
@@ -194,25 +263,62 @@
   // Finalizes the last packet and returns the chunk.
   writer.reset();
 
-  VerifyPacketCount(kNumAdditionalPackets);
+  VerifyPackets(kNumAdditionalPackets);
 }
 
 TEST_P(StartupTraceWriterTest, BindingWhileWritingFails) {
-  // Create an unbound writer.
-  std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+  auto writer = CreateUnboundWriter();
 
-  const BufferID kBufId = 42;
   {
-    // Begin a write by opening a TracePacket
+    // Begin a write by opening a TracePacket.
     auto packet = writer->NewTracePacket();
+    packet->set_for_testing()->set_str(kPacketPayload);
 
     // Binding while writing should fail.
-    EXPECT_FALSE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+    EXPECT_FALSE(BindWriter(writer.get()));
   }
 
   // Packet was completed, so binding should work now and emit the packet.
-  EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
-  VerifyPacketCount(1);
+  EXPECT_TRUE(BindWriter(writer.get()));
+  VerifyPackets(1);
+}
+
+TEST_P(StartupTraceWriterTest, CreateAndBindViaRegistry) {
+  std::unique_ptr<StartupTraceWriterRegistry> registry(
+      new StartupTraceWriterRegistry());
+
+  // Create unbound writers.
+  auto writer1 = registry->CreateUnboundTraceWriter();
+  auto writer2 = registry->CreateUnboundTraceWriter();
+
+  EXPECT_EQ(2u, GetUnboundWriterCount(*registry));
+
+  // Return |writer2|. It should be kept alive until the registry is bound.
+  registry->ReturnUnboundTraceWriter(std::move(writer2));
+
+  {
+    // Begin a write by opening a TracePacket on |writer1|.
+    auto packet = writer1->NewTracePacket();
+
+    // Binding |writer1| writing should fail, but |writer2| should be bound.
+    const BufferID kBufId = 42;
+    arbiter_->BindStartupTraceWriterRegistry(std::move(registry), kBufId);
+    EXPECT_EQ(1u, GetUnboundWriterCount(*arbiter_));
+  }
+
+  // Wait for |writer1| to be bound and the registry to be deleted.
+  auto checkpoint_name = "all_bound";
+  auto all_bound = task_runner_->CreateCheckpoint(checkpoint_name);
+  std::function<void()> task;
+  task = [&task, &all_bound, this]() {
+    if (!GetBindingRegistriesCount(*arbiter_)) {
+      all_bound();
+      return;
+    }
+    task_runner_->PostDelayedTask(task, 1);
+  };
+  task_runner_->PostDelayedTask(task, 1);
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
 }  // namespace