Protozero: fall back on the heap when nesting too deep

This CL lifts the historic restriction on the max nesting
of protozero messages. It does so by removing the
stack-based |nested_message_arena_| hack and instead using
an explicit MessgeArena object that is responsible for
memory ownership of protozero Message objects.
The difference between root and nested messages is made
explicit by introducing RootMessage<T> (as opposite to
just Message<T>), which owns the storage for all the
submessages.
This CL still maintains the property of "(mostly) zero
allocations": the MessageArena 0th block is allocated
when the TraceWriterImpl is constructed an in most cases
never expanded. It gets expanded allocating new blocks
only when writing a message that is nested too deep.
Turns out the change is quite self-contained as most
tracing obtains Message objects via
TraceWriter::NewTracePacket(). The places that require
code changes are very limited.

Bug: 163125063
Bug: 125322557
Bug: crbug.com/960303
Change-Id: I8d7a313d81d0c407ef2c549ff5bc9febb30a54a0
diff --git a/Android.bp b/Android.bp
index 37c42cf..591e31b 100644
--- a/Android.bp
+++ b/Android.bp
@@ -6484,6 +6484,7 @@
   srcs: [
     "src/protozero/field.cc",
     "src/protozero/message.cc",
+    "src/protozero/message_arena.cc",
     "src/protozero/message_handle.cc",
     "src/protozero/packed_repeated_fields.cc",
     "src/protozero/proto_decoder.cc",
diff --git a/BUILD b/BUILD
index 576dc0e..0ce0dc9 100644
--- a/BUILD
+++ b/BUILD
@@ -413,10 +413,12 @@
         "include/perfetto/protozero/cpp_message_obj.h",
         "include/perfetto/protozero/field.h",
         "include/perfetto/protozero/message.h",
+        "include/perfetto/protozero/message_arena.h",
         "include/perfetto/protozero/message_handle.h",
         "include/perfetto/protozero/packed_repeated_fields.h",
         "include/perfetto/protozero/proto_decoder.h",
         "include/perfetto/protozero/proto_utils.h",
+        "include/perfetto/protozero/root_message.h",
         "include/perfetto/protozero/scattered_heap_buffer.h",
         "include/perfetto/protozero/scattered_stream_null_delegate.h",
         "include/perfetto/protozero/scattered_stream_writer.h",
@@ -665,6 +667,7 @@
     srcs = [
         "src/protozero/field.cc",
         "src/protozero/message.cc",
+        "src/protozero/message_arena.cc",
         "src/protozero/message_handle.cc",
         "src/protozero/packed_repeated_fields.cc",
         "src/protozero/proto_decoder.cc",
diff --git a/include/perfetto/base/compiler.h b/include/perfetto/base/compiler.h
index 334c3f9..b228411 100644
--- a/include/perfetto/base/compiler.h
+++ b/include/perfetto/base/compiler.h
@@ -66,6 +66,21 @@
 #define PERFETTO_THREAD_LOCAL thread_local
 #endif
 
+#if defined(__clang__)
+#if __has_feature(address_sanitizer) || defined(__SANITIZE_ADDRESS__)
+extern "C" void __asan_poison_memory_region(void const volatile*, size_t);
+extern "C" void __asan_unpoison_memory_region(void const volatile*, size_t);
+#define PERFETTO_ASAN_POISON(a, s) __asan_poison_memory_region((a), (s))
+#define PERFETTO_ASAN_UNPOISON(a, s) __asan_unpoison_memory_region((a), (s))
+#else
+#define PERFETTO_ASAN_POISON(addr, size)
+#define PERFETTO_ASAN_UNPOISON(addr, size)
+#endif  // __has_feature(address_sanitizer)
+#else
+#define PERFETTO_ASAN_POISON(addr, size)
+#define PERFETTO_ASAN_UNPOISON(addr, size)
+#endif  // __clang__
+
 namespace perfetto {
 namespace base {
 
diff --git a/include/perfetto/protozero/BUILD.gn b/include/perfetto/protozero/BUILD.gn
index ee0f77a..93fdce4 100644
--- a/include/perfetto/protozero/BUILD.gn
+++ b/include/perfetto/protozero/BUILD.gn
@@ -20,10 +20,12 @@
     "cpp_message_obj.h",
     "field.h",
     "message.h",
+    "message_arena.h",
     "message_handle.h",
     "packed_repeated_fields.h",
     "proto_decoder.h",
     "proto_utils.h",
+    "root_message.h",
     "scattered_heap_buffer.h",
     "scattered_stream_null_delegate.h",
     "scattered_stream_writer.h",
diff --git a/include/perfetto/protozero/message.h b/include/perfetto/protozero/message.h
index b53dcda..a47db7e 100644
--- a/include/perfetto/protozero/message.h
+++ b/include/perfetto/protozero/message.h
@@ -38,30 +38,28 @@
 
 namespace protozero {
 
+class MessageArena;
 class MessageHandleBase;
 
 // Base class extended by the proto C++ stubs generated by the ProtoZero
 // compiler. This class provides the minimal runtime required to support
 // append-only operations and is designed for performance. None of the methods
-// require any dynamic memory allocation.
+// require any dynamic memory allocation, unless more than 16 nested messages
+// are created via BeginNestedMessage() calls.
 class PERFETTO_EXPORT Message {
  public:
   friend class MessageHandleBase;
 
-  // Adjust the |nested_messages_arena_| size when changing this, or the
-  // static_assert in the .cc file will bark.
-  static constexpr uint32_t kMaxNestingDepth = 10;
-
-  // Ctor and Dtor of Message are never called, with the exeception
-  // of root (non-nested) messages. Nested messages are allocated via placement
-  // new in the |nested_messages_arena_| and implictly destroyed when the arena
-  // of the root message goes away. This is fine as long as all the fields are
-  // PODs, which is checked by the static_assert in the ctor (see the Reset()
-  // method in the .cc file).
+  // The ctor is deliberately a no-op to avoid forwarding args from all
+  // subclasses. The real initialization is performed by Reset().
+  // Nested messages are allocated via placement new by MessageArena and
+  // implictly destroyed when the RootMessage's arena goes away. This is
+  // fine as long as all the fields are PODs, which is checked by the
+  // static_assert()s in the Reset() method.
   Message() = default;
 
   // Clears up the state, allowing the message to be reused as a fresh one.
-  void Reset(ScatteredStreamWriter*);
+  void Reset(ScatteredStreamWriter*, MessageArena*);
 
   // Commits all the changes to the buffer (backfills the size field of this and
   // all nested messages) and seals the message. Returns the size of the message
@@ -156,10 +154,9 @@
                               ContiguousMemoryRange* ranges,
                               size_t num_ranges);
 
-  // Begins a nested message, using the static storage provided by the parent
-  // class (see comment in |nested_messages_arena_|). The nested message ends
-  // either when Finalize() is called or when any other Append* method is called
-  // in the parent class.
+  // Begins a nested message. The returned object is owned by the MessageArena
+  // of the root message. The nested message ends either when Finalize() is
+  // called or when any other Append* method is called in the parent class.
   // The template argument T is supposed to be a stub class auto generated from
   // a .proto, hence a subclass of Message.
   template <class T>
@@ -170,9 +167,7 @@
                   "T must be a subclass of Message");
     static_assert(sizeof(T) == sizeof(Message),
                   "Message subclasses cannot introduce extra state.");
-    T* message = reinterpret_cast<T*>(nested_messages_arena_);
-    BeginNestedMessageInternal(field_id, message);
-    return message;
+    return static_cast<T*>(BeginNestedMessageInternal(field_id));
   }
 
   ScatteredStreamWriter* stream_writer_for_testing() { return stream_writer_; }
@@ -191,7 +186,7 @@
   Message(const Message&) = delete;
   Message& operator=(const Message&) = delete;
 
-  void BeginNestedMessageInternal(uint32_t field_id, Message*);
+  Message* BeginNestedMessageInternal(uint32_t field_id);
 
   // Called by Finalize and Append* methods.
   void EndNestedMessage();
@@ -210,6 +205,22 @@
   // The stream writer interface used for the serialization.
   ScatteredStreamWriter* stream_writer_;
 
+  // The storage used to allocate nested Message objects.
+  // This is owned by RootMessage<T>.
+  MessageArena* arena_;
+
+  // Pointer to the last child message created through BeginNestedMessage(), if
+  // any, nullptr otherwise. There is no need to keep track of more than one
+  // message per nesting level as the proto-zero API contract mandates that
+  // nested fields can be filled only in a stacked fashion. In other words,
+  // nested messages are finalized and sealed when any other field is set in the
+  // parent message (or the parent message itself is finalized) and cannot be
+  // accessed anymore afterwards.
+  Message* nested_message_;
+
+  // [optional] Pointer to a non-aligned pre-reserved var-int slot of
+  // kMessageLengthFieldSize bytes. When set, the Finalize() method will write
+  // the size of proto-encoded message in the pointed memory region.
   uint8_t* size_field_;
 
   // Keeps track of the size of the current message.
@@ -222,10 +233,6 @@
   // attempts of writing to a message which has been Finalize()-d.
   bool finalized_;
 
-  // Used to detect attemps to create messages with a nesting level >
-  // kMaxNestingDepth. |nesting_depth_| == 0 for root (non-nested) messages.
-  uint8_t nesting_depth_;
-
 #if PERFETTO_DCHECK_IS_ON()
   // Current generation of message. Incremented on Reset.
   // Used to detect stale handles.
@@ -233,28 +240,6 @@
 
   MessageHandleBase* handle_;
 #endif
-
-  // Pointer to the last child message created through BeginNestedMessage(), if
-  // any, nullptr otherwise. There is no need to keep track of more than one
-  // message per nesting level as the proto-zero API contract mandates that
-  // nested fields can be filled only in a stacked fashion. In other words,
-  // nested messages are finalized and sealed when any other field is set in the
-  // parent message (or the parent message itself is finalized) and cannot be
-  // accessed anymore afterwards.
-  // TODO(primiano): optimization: I think that nested_message_, when non-null.
-  // will always be @ (this) + offsetof(nested_messages_arena_).
-  Message* nested_message_;
-
-  // The root message owns the storage for all its nested messages, up to a max
-  // of kMaxNestingDepth levels (see the .cc file). Note that the boundaries of
-  // the arena are meaningful only for the root message.
-  // Unfortunately we cannot put the sizeof() math here because we cannot sizeof
-  // the current class in a header. However the .cc file has a static_assert
-  // that guarantees that (see the Reset() method in the .cc file).
-  alignas(sizeof(void*)) uint8_t nested_messages_arena_[512];
-
-  // DO NOT add any fields below |nested_messages_arena_|. The memory layout of
-  // nested messages would overflow the storage allocated by the root message.
 };
 
 }  // namespace protozero
diff --git a/include/perfetto/protozero/message_arena.h b/include/perfetto/protozero/message_arena.h
new file mode 100644
index 0000000..4905dae
--- /dev/null
+++ b/include/perfetto/protozero/message_arena.h
@@ -0,0 +1,95 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_PROTOZERO_MESSAGE_ARENA_H_
+#define INCLUDE_PERFETTO_PROTOZERO_MESSAGE_ARENA_H_
+
+#include <stdint.h>
+
+#include <list>
+#include <type_traits>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/protozero/message.h"
+
+namespace protozero {
+
+class Message;
+
+// Object allocator for fixed-sized protozero::Message objects.
+// It's a simple bump-pointer allocator which leverages the stack-alike
+// usage pattern of protozero nested messages. It avoids hitting the system
+// allocator in most cases, by reusing the same block, and falls back on
+// allocating new blocks only when using deeply nested messages (which are
+// extremely rare).
+// This is used by RootMessage<T> to handle the storage for root-level messages.
+class PERFETTO_EXPORT MessageArena {
+ public:
+  MessageArena();
+  ~MessageArena();
+
+  // Strictly no copies or moves as this is used to hand out pointers.
+  MessageArena(const MessageArena&) = delete;
+  MessageArena& operator=(const MessageArena&) = delete;
+  MessageArena(MessageArena&&) = delete;
+  MessageArena& operator=(MessageArena&&) = delete;
+
+  // Allocates a new Message object.
+  Message* NewMessage();
+
+  // Deletes the last message allocated. The |msg| argument is used only for
+  // DCHECKs, it MUST be the pointer obtained by the last NewMessage() call.
+  void DeleteLastMessage(Message* msg) {
+    PERFETTO_DCHECK(!blocks_.empty() && blocks_.back().entries > 0);
+    PERFETTO_DCHECK(&blocks_.back().storage[blocks_.back().entries - 1] ==
+                    static_cast<void*>(msg));
+    DeleteLastMessageInternal();
+  }
+
+  // Resets the state of the arena, clearing up all but one block. This is used
+  // to avoid leaking outstanding unfinished sub-messages while recycling the
+  // RootMessage object (this is extremely rare due to the RAII scoped handles
+  // but could happen if some client does some overly clever std::move() trick).
+  void Reset() {
+    PERFETTO_DCHECK(!blocks_.empty());
+    blocks_.resize(1);
+    auto& block = blocks_.back();
+    block.entries = 0;
+    PERFETTO_ASAN_POISON(block.storage, sizeof(block.storage));
+  }
+
+ private:
+  void DeleteLastMessageInternal();
+
+  struct Block {
+    static constexpr size_t kCapacity = 16;
+
+    Block() { PERFETTO_ASAN_POISON(storage, sizeof(storage)); }
+
+    std::aligned_storage<sizeof(Message), alignof(Message)>::type
+        storage[kCapacity];
+    uint32_t entries = 0;  // # Message entries used (<= kCapacity).
+  };
+
+  // blocks are used to hand out pointers and must not be moved. Hence why
+  // std::list rather than std::vector.
+  std::list<Block> blocks_;
+};
+
+}  // namespace protozero
+
+#endif  // INCLUDE_PERFETTO_PROTOZERO_MESSAGE_ARENA_H_
diff --git a/include/perfetto/protozero/root_message.h b/include/perfetto/protozero/root_message.h
new file mode 100644
index 0000000..40e2328
--- /dev/null
+++ b/include/perfetto/protozero/root_message.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_PROTOZERO_ROOT_MESSAGE_H_
+#define INCLUDE_PERFETTO_PROTOZERO_ROOT_MESSAGE_H_
+
+#include "perfetto/protozero/message.h"
+#include "perfetto/protozero/message_arena.h"
+
+namespace protozero {
+
+// Helper class to hand out messages using the default MessageArena.
+// Usage:
+// RootMessage<perfetto::protos::zero::MyMessage> msg;
+// msg.Reset(stream_writer);
+// msg.set_foo(...);
+// auto* nested = msg.set_nested();
+template <typename T = Message>
+class RootMessage : public T {
+ public:
+  RootMessage() { T::Reset(nullptr, &root_arena_); }
+
+  // Disallow copy and move.
+  RootMessage(const RootMessage&) = delete;
+  RootMessage& operator=(const RootMessage&) = delete;
+  RootMessage(RootMessage&&) = delete;
+  RootMessage& operator=(RootMessage&&) = delete;
+
+  void Reset(ScatteredStreamWriter* writer) {
+    root_arena_.Reset();
+    Message::Reset(writer, &root_arena_);
+  }
+
+ private:
+  MessageArena root_arena_;
+};
+
+}  // namespace protozero
+
+#endif  // INCLUDE_PERFETTO_PROTOZERO_ROOT_MESSAGE_H_
diff --git a/include/perfetto/protozero/scattered_heap_buffer.h b/include/perfetto/protozero/scattered_heap_buffer.h
index bc6a0d2..a42c977 100644
--- a/include/perfetto/protozero/scattered_heap_buffer.h
+++ b/include/perfetto/protozero/scattered_heap_buffer.h
@@ -23,6 +23,7 @@
 
 #include "perfetto/base/export.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_stream_writer.h"
 
 namespace protozero {
@@ -165,7 +166,7 @@
  private:
   ScatteredHeapBuffer shb_;
   ScatteredStreamWriter writer_;
-  T msg_;
+  RootMessage<T> msg_;
 };
 
 }  // namespace protozero
diff --git a/include/perfetto/protozero/static_buffer.h b/include/perfetto/protozero/static_buffer.h
index 6f5924f..3d7ec3d 100644
--- a/include/perfetto/protozero/static_buffer.h
+++ b/include/perfetto/protozero/static_buffer.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "perfetto/base/export.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_stream_writer.h"
 
 namespace protozero {
@@ -79,7 +80,7 @@
  private:
   StaticBufferDelegate delegate_;
   ScatteredStreamWriter writer_;
-  T msg_;
+  RootMessage<T> msg_;
 };
 
 // Helper function to create stack-based protozero messages in one line.
diff --git a/src/protozero/BUILD.gn b/src/protozero/BUILD.gn
index de6c11c..e1a9036 100644
--- a/src/protozero/BUILD.gn
+++ b/src/protozero/BUILD.gn
@@ -30,6 +30,7 @@
   sources = [
     "field.cc",
     "message.cc",
+    "message_arena.cc",
     "message_handle.cc",
     "packed_repeated_fields.cc",
     "proto_decoder.cc",
diff --git a/src/protozero/message.cc b/src/protozero/message.cc
index 419f7f1..cfc9b37 100644
--- a/src/protozero/message.cc
+++ b/src/protozero/message.cc
@@ -20,6 +20,7 @@
 #include <type_traits>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/protozero/message_arena.h"
 #include "perfetto/protozero/message_handle.h"
 
 #if __BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__
@@ -38,14 +39,11 @@
 
 }  // namespace
 
-// static
-constexpr uint32_t Message::kMaxNestingDepth;
-
 // Do NOT put any code in the constructor or use default initialization.
-// Use the Reset() method below instead. See the header for the reason why.
+// Use the Reset() method below instead.
 
 // This method is called to initialize both root and nested messages.
-void Message::Reset(ScatteredStreamWriter* stream_writer) {
+void Message::Reset(ScatteredStreamWriter* stream_writer, MessageArena* arena) {
 // Older versions of libstdcxx don't have is_trivially_constructible.
 #if !defined(__GLIBCXX__) || __GLIBCXX__ >= 20170516
   static_assert(std::is_trivially_constructible<Message>::value,
@@ -54,19 +52,12 @@
 
   static_assert(std::is_trivially_destructible<Message>::value,
                 "Message must be trivially destructible");
-
-  static_assert(
-      sizeof(Message::nested_messages_arena_) >=
-          kMaxNestingDepth *
-              (sizeof(Message) - sizeof(Message::nested_messages_arena_)),
-      "Message::nested_messages_arena_ is too small");
-
   stream_writer_ = stream_writer;
+  arena_ = arena;
   size_ = 0;
   size_field_ = nullptr;
   size_already_written_ = 0;
   nested_message_ = nullptr;
-  nesting_depth_ = 0;
   finalized_ = false;
 #if PERFETTO_DCHECK_IS_ON()
   handle_ = nullptr;
@@ -147,7 +138,7 @@
   return size_;
 }
 
-void Message::BeginNestedMessageInternal(uint32_t field_id, Message* message) {
+Message* Message::BeginNestedMessageInternal(uint32_t field_id) {
   if (nested_message_)
     EndNestedMessage();
 
@@ -157,20 +148,22 @@
       proto_utils::MakeTagLengthDelimited(field_id), data);
   WriteToStream(data, data_end);
 
-  message->Reset(stream_writer_);
-  PERFETTO_CHECK(nesting_depth_ < kMaxNestingDepth);
-  message->nesting_depth_ = nesting_depth_ + 1;
+  Message* message = arena_->NewMessage();
+  message->Reset(stream_writer_, arena_);
 
   // The length of the nested message cannot be known upfront. So right now
   // just reserve the bytes to encode the size after the nested message is done.
   message->set_size_field(
       stream_writer_->ReserveBytes(proto_utils::kMessageLengthFieldSize));
   size_ += proto_utils::kMessageLengthFieldSize;
+
   nested_message_ = message;
+  return message;
 }
 
 void Message::EndNestedMessage() {
   size_ += nested_message_->Finalize();
+  arena_->DeleteLastMessage(nested_message_);
   nested_message_ = nullptr;
 }
 
diff --git a/src/protozero/message_arena.cc b/src/protozero/message_arena.cc
new file mode 100644
index 0000000..6e92cd0
--- /dev/null
+++ b/src/protozero/message_arena.cc
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/protozero/message_arena.h"
+
+#include <atomic>
+#include <type_traits>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/protozero/message_handle.h"
+
+namespace protozero {
+
+MessageArena::MessageArena() {
+  // The code below assumes that there is always at least one block.
+  blocks_.emplace_front();
+  static_assert(std::alignment_of<decltype(blocks_.back().storage[0])>::value >=
+                    alignof(Message),
+                "MessageArea's storage is not properly aligned");
+}
+
+MessageArena::~MessageArena() = default;
+
+Message* MessageArena::NewMessage() {
+  PERFETTO_DCHECK(!blocks_.empty());  // Should never become empty.
+
+  Block* block = &blocks_.back();
+  if (PERFETTO_UNLIKELY(block->entries >= Block::kCapacity)) {
+    blocks_.emplace_back();
+    block = &blocks_.back();
+  }
+  const auto idx = block->entries++;
+  void* storage = &block->storage[idx];
+  PERFETTO_ASAN_UNPOISON(storage, sizeof(Message));
+  return new (storage) Message();
+}
+
+void MessageArena::DeleteLastMessageInternal() {
+  PERFETTO_DCHECK(!blocks_.empty());  // Should never be empty, see below.
+  Block* block = &blocks_.back();
+  PERFETTO_DCHECK(block->entries > 0);
+
+  // This is the reason why there is no ~Message() call here.
+  // MessageArea::Reset() (see header) also relies on dtor being trivial.
+  static_assert(std::is_trivially_destructible<Message>::value,
+                "Message must be trivially destructible");
+
+  --block->entries;
+  PERFETTO_ASAN_POISON(&block->storage[block->entries], sizeof(Message));
+
+  // Don't remove the first block to avoid malloc/free calls when the root
+  // message is reset. Hitting the allocator all the times is a waste of time.
+  if (block->entries == 0 && blocks_.size() > 1) {
+    blocks_.pop_back();
+  }
+}
+
+}  // namespace protozero
diff --git a/src/protozero/message_handle_unittest.cc b/src/protozero/message_handle_unittest.cc
index 7f0ee08..96c8600 100644
--- a/src/protozero/message_handle_unittest.cc
+++ b/src/protozero/message_handle_unittest.cc
@@ -16,7 +16,7 @@
 
 #include "perfetto/protozero/message_handle.h"
 
-#include "perfetto/protozero/message.h"
+#include "perfetto/protozero/root_message.h"
 #include "test/gtest_and_gmock.h"
 
 namespace protozero {
@@ -24,8 +24,7 @@
 namespace {
 
 TEST(MessageHandleTest, MoveHandleSharedMessageDoesntFinalize) {
-  Message message;
-  message.Reset(nullptr);
+  RootMessage<Message> message;
 
   MessageHandle<Message> handle_1(&message);
   handle_1 = MessageHandle<Message>(&message);
diff --git a/src/protozero/message_unittest.cc b/src/protozero/message_unittest.cc
index d75dcd0..a061ad9 100644
--- a/src/protozero/message_unittest.cc
+++ b/src/protozero/message_unittest.cc
@@ -15,7 +15,6 @@
  */
 
 #include "perfetto/protozero/message.h"
-#include "perfetto/protozero/message_handle.h"
 
 #include <limits>
 #include <memory>
@@ -23,6 +22,8 @@
 #include <vector>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/protozero/message_handle.h"
+#include "perfetto/protozero/root_message.h"
 #include "src/base/test/utils.h"
 #include "src/protozero/test/fake_scattered_buffer.h"
 #include "test/gtest_and_gmock.h"
@@ -38,7 +39,7 @@
 constexpr const char kEndWatermark[] = {'9', '8', '7', '6',
                                         'z', 'w', 'y', '\0'};
 
-class FakeRootMessage : public Message {};
+class FakeRootMessage : public RootMessage<Message> {};
 class FakeChildMessage : public Message {};
 
 uint32_t SimpleHash(const std::string& str) {
@@ -63,7 +64,10 @@
       EXPECT_STREQ(kStartWatermark, reinterpret_cast<char*>(mem.get()));
       EXPECT_STREQ(kEndWatermark,
                    reinterpret_cast<char*>(mem.get() + sizeof(kStartWatermark) +
-                                           sizeof(Message)));
+                                           sizeof(FakeRootMessage)));
+      FakeRootMessage* msg = reinterpret_cast<FakeRootMessage*>(
+          mem.get() + sizeof(kStartWatermark));
+      msg->~FakeRootMessage();
       mem.reset();
     }
     messages_.clear();
@@ -83,7 +87,7 @@
     memcpy(msg_start + sizeof(FakeRootMessage), kEndWatermark,
            sizeof(kEndWatermark));
     messages_.push_back(std::move(mem));
-    FakeRootMessage* msg = reinterpret_cast<FakeRootMessage*>(msg_start);
+    FakeRootMessage* msg = new (msg_start) FakeRootMessage();
     msg->Reset(stream_writer_.get());
     return msg;
   }
@@ -101,14 +105,16 @@
     return buffer_->GetBytesAsString(old_readback_pos, num_bytes);
   }
 
-  static void BuildNestedMessages(Message* msg, uint32_t depth = 0) {
+  static void BuildNestedMessages(Message* msg,
+                                  uint32_t max_depth,
+                                  uint32_t depth = 0) {
     for (uint32_t i = 1; i <= 128; ++i)
       msg->AppendBytes(i, kTestBytes, sizeof(kTestBytes));
 
-    if (depth < Message::kMaxNestingDepth) {
+    if (depth < max_depth) {
       auto* nested_msg =
           msg->BeginNestedMessage<FakeChildMessage>(1 + depth * 10);
-      BuildNestedMessages(nested_msg, depth + 1);
+      BuildNestedMessages(nested_msg, max_depth, depth + 1);
     }
 
     for (uint32_t i = 129; i <= 256; ++i)
@@ -279,7 +285,7 @@
   std::vector<Message*> nested_msgs;
 
   Message* root_msg = NewMessage();
-  BuildNestedMessages(root_msg);
+  BuildNestedMessages(root_msg, /*max_depth=*/10);
   root_msg->Finalize();
 
   // The main point of this test is to stress the code paths and test for
@@ -291,13 +297,24 @@
   EXPECT_EQ(0xf9e32b65, buf_hash);
 }
 
+TEST_F(MessageTest, DeeplyNested) {
+  std::vector<Message*> nested_msgs;
+
+  Message* root_msg = NewMessage();
+  BuildNestedMessages(root_msg, /*max_depth=*/1000);
+  root_msg->Finalize();
+
+  std::string full_buf = GetNextSerializedBytes(GetNumSerializedBytes());
+  size_t buf_hash = SimpleHash(full_buf);
+  EXPECT_EQ(0xc0fde419, buf_hash);
+}
+
 TEST_F(MessageTest, DestructInvalidMessageHandle) {
   FakeRootMessage* msg = NewMessage();
-  EXPECT_DCHECK_DEATH(
-      {
-        MessageHandle<FakeRootMessage> handle(msg);
-        ResetMessage(msg);
-      });
+  EXPECT_DCHECK_DEATH({
+    MessageHandle<FakeRootMessage> handle(msg);
+    ResetMessage(msg);
+  });
 }
 
 TEST_F(MessageTest, MessageHandle) {
diff --git a/src/traced/probes/ftrace/cpu_reader_benchmark.cc b/src/traced/probes/ftrace/cpu_reader_benchmark.cc
index ee6d246..063eefe 100644
--- a/src/traced/probes/ftrace/cpu_reader_benchmark.cc
+++ b/src/traced/probes/ftrace/cpu_reader_benchmark.cc
@@ -15,6 +15,7 @@
 #include <benchmark/benchmark.h>
 
 #include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_stream_null_delegate.h"
 #include "perfetto/protozero/scattered_stream_writer.h"
 #include "protos/perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
@@ -310,7 +311,7 @@
 
   ScatteredStreamWriterNullDelegate delegate(perfetto::base::kPageSize);
   ScatteredStreamWriter stream(&delegate);
-  FtraceEventBundle writer;
+  protozero::RootMessage<FtraceEventBundle> writer;
 
   ProtoTranslationTable* table = GetTable(test_case->name);
   auto page = PageFromXxd(test_case->data);
diff --git a/src/tracing/core/null_trace_writer.cc b/src/tracing/core/null_trace_writer.cc
index 40fe9a5..784c2c1 100644
--- a/src/tracing/core/null_trace_writer.cc
+++ b/src/tracing/core/null_trace_writer.cc
@@ -18,7 +18,6 @@
 
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/utils.h"
-
 #include "perfetto/protozero/message.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -27,7 +26,7 @@
 
 NullTraceWriter::NullTraceWriter()
     : delegate_(base::kPageSize), stream_(&delegate_) {
-  cur_packet_.reset(new protos::pbzero::TracePacket());
+  cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
 }
 
diff --git a/src/tracing/core/null_trace_writer.h b/src/tracing/core/null_trace_writer.h
index fabbca7..4f6c707 100644
--- a/src/tracing/core/null_trace_writer.h
+++ b/src/tracing/core/null_trace_writer.h
@@ -19,6 +19,7 @@
 
 #include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/protozero/message_handle.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_stream_null_delegate.h"
 
 namespace perfetto {
@@ -47,7 +48,8 @@
 
   // The packet returned via NewTracePacket(). Its owned by this class,
   // TracePacketHandle has just a pointer to it.
-  std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+  std::unique_ptr<protozero::RootMessage<protos::pbzero::TracePacket>>
+      cur_packet_;
 };
 
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_for_testing.cc b/src/tracing/core/trace_writer_for_testing.cc
index 838f93e..37357c7 100644
--- a/src/tracing/core/trace_writer_for_testing.cc
+++ b/src/tracing/core/trace_writer_for_testing.cc
@@ -29,7 +29,7 @@
                 static_cast<size_t>(base::kPageSize)),
       stream_(&delegate_) {
   delegate_.set_writer(&stream_);
-  cur_packet_.reset(new protos::pbzero::TracePacket());
+  cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
 }
 
diff --git a/src/tracing/core/trace_writer_for_testing.h b/src/tracing/core/trace_writer_for_testing.h
index f066fee..16c7c76 100644
--- a/src/tracing/core/trace_writer_for_testing.h
+++ b/src/tracing/core/trace_writer_for_testing.h
@@ -20,6 +20,7 @@
 
 #include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/protozero/message_handle.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_heap_buffer.h"
 #include "protos/perfetto/trace/trace_packet.gen.h"
 
@@ -53,7 +54,8 @@
 
   // The packet returned via NewTracePacket(). Its owned by this class,
   // TracePacketHandle has just a pointer to it.
-  std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+  std::unique_ptr<protozero::RootMessage<protos::pbzero::TracePacket>>
+      cur_packet_;
 };
 
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index f0705ae..dd0044b 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -24,7 +24,9 @@
 
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/thread_annotations.h"
+#include "perfetto/protozero/message.h"
 #include "perfetto/protozero/proto_utils.h"
+#include "perfetto/protozero/root_message.h"
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -54,7 +56,7 @@
   // more gracefully and always return a no-op TracePacket in NewTracePacket().
   PERFETTO_CHECK(id_ != 0);
 
-  cur_packet_.reset(new protos::pbzero::TracePacket());
+  cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
 }
 
diff --git a/src/tracing/core/trace_writer_impl.h b/src/tracing/core/trace_writer_impl.h
index be2daef..9f3b970 100644
--- a/src/tracing/core/trace_writer_impl.h
+++ b/src/tracing/core/trace_writer_impl.h
@@ -24,6 +24,7 @@
 #include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/protozero/message_handle.h"
 #include "perfetto/protozero/proto_utils.h"
+#include "perfetto/protozero/root_message.h"
 #include "perfetto/protozero/scattered_stream_writer.h"
 #include "perfetto/tracing/buffer_exhausted_policy.h"
 #include "src/tracing/core/patch_list.h"
@@ -92,7 +93,8 @@
 
   // The packet returned via NewTracePacket(). Its owned by this class,
   // TracePacketHandle has just a pointer to it.
-  std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+  std::unique_ptr<protozero::RootMessage<protos::pbzero::TracePacket>>
+      cur_packet_;
 
   // The start address of |cur_packet_| within |cur_chunk_|. Used to figure out
   // fragments sizes when a TracePacket write is interrupted by GetNewBuffer().