TraceProcessor: Add ProtoRingBuffer for RPC pipe

ProtoRingBuffer helps tokenizing messages received on
the (upcoming) RPC pipe, so that the underlying stream
(JS<>Wasm postmessage or HTTP) isn't required to deal
with that.

Bug: 159142289
Test: perfetto_unittests --gtest_filter=ProtoRingBufferTest.*
Change-Id: Ic66051c49598f2667b8b32be487b6ea19b3a0353
diff --git a/Android.bp b/Android.bp
index d954988..ae3b5b3 100644
--- a/Android.bp
+++ b/Android.bp
@@ -7606,6 +7606,7 @@
 filegroup {
   name: "perfetto_src_trace_processor_rpc_rpc",
   srcs: [
+    "src/trace_processor/rpc/proto_ring_buffer.cc",
     "src/trace_processor/rpc/query_result_serializer.cc",
     "src/trace_processor/rpc/rpc.cc",
   ],
@@ -7615,6 +7616,7 @@
 filegroup {
   name: "perfetto_src_trace_processor_rpc_unittests",
   srcs: [
+    "src/trace_processor/rpc/proto_ring_buffer_unittest.cc",
     "src/trace_processor/rpc/query_result_serializer_unittest.cc",
   ],
 }
diff --git a/BUILD b/BUILD
index 68cead7..7dc8de9 100644
--- a/BUILD
+++ b/BUILD
@@ -973,6 +973,8 @@
 filegroup(
     name = "src_trace_processor_rpc_rpc",
     srcs = [
+        "src/trace_processor/rpc/proto_ring_buffer.cc",
+        "src/trace_processor/rpc/proto_ring_buffer.h",
         "src/trace_processor/rpc/query_result_serializer.cc",
         "src/trace_processor/rpc/query_result_serializer.h",
         "src/trace_processor/rpc/rpc.cc",
diff --git a/src/trace_processor/rpc/BUILD.gn b/src/trace_processor/rpc/BUILD.gn
index c81552c..63cc752 100644
--- a/src/trace_processor/rpc/BUILD.gn
+++ b/src/trace_processor/rpc/BUILD.gn
@@ -23,6 +23,8 @@
 # interface) and by the :httpd module for the HTTP interface.
 source_set("rpc") {
   sources = [
+    "proto_ring_buffer.cc",
+    "proto_ring_buffer.h",
     "query_result_serializer.cc",
     "query_result_serializer.h",
     "rpc.cc",
@@ -41,7 +43,10 @@
 
 perfetto_unittest_source_set("unittests") {
   testonly = true
-  sources = [ "query_result_serializer_unittest.cc" ]
+  sources = [
+    "proto_ring_buffer_unittest.cc",
+    "query_result_serializer_unittest.cc",
+  ]
   deps = [
     ":rpc",
     "..:lib",
@@ -49,6 +54,7 @@
     "../../../gn:gtest_and_gmock",
     "../../../protos/perfetto/trace_processor:zero",
     "../../base",
+    "../../protozero",
   ]
 }
 
diff --git a/src/trace_processor/rpc/proto_ring_buffer.cc b/src/trace_processor/rpc/proto_ring_buffer.cc
new file mode 100644
index 0000000..3354efe
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer.cc
@@ -0,0 +1,188 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/proto_ring_buffer.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/paged_memory.h"
+#include "perfetto/protozero/proto_utils.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+constexpr size_t kGrowBytes = 128 * 1024;
+
+inline ProtoRingBuffer::Message FramingError() {
+  ProtoRingBuffer::Message msg{};
+  msg.fatal_framing_error = true;
+  return msg;
+}
+
+// Tries to decode a length-delimited proto field from |start|.
+// Returns a valid boundary if the preamble is valid and the length is within
+// |end|, or an invalid message otherwise.
+ProtoRingBuffer::Message TryReadMessage(const uint8_t* start,
+                                        const uint8_t* end) {
+  namespace proto_utils = protozero::proto_utils;
+  uint64_t field_tag = 0;
+  auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
+  if (start_of_len == start)
+    return ProtoRingBuffer::Message{};  // Not enough data.
+
+  const uint32_t tag = field_tag & 0x07;
+  if (tag !=
+      static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
+    PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
+    return FramingError();
+  }
+
+  uint64_t msg_len = 0;
+  auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
+  if (start_of_msg == start_of_len)
+    return ProtoRingBuffer::Message{};  // Not enough data.
+
+  if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
+    PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
+                  msg_len, ProtoRingBuffer::kMaxMsgSize);
+    return FramingError();
+  }
+
+  if (start_of_msg + msg_len > end)
+    return ProtoRingBuffer::Message{};  // Not enough data.
+
+  ProtoRingBuffer::Message msg{};
+  msg.start = start_of_msg;
+  msg.len = static_cast<uint32_t>(msg_len);
+  msg.field_id = static_cast<uint32_t>(field_tag >> 3);
+  return msg;
+}
+
+}  // namespace
+
+ProtoRingBuffer::ProtoRingBuffer()
+    : buf_(base::PagedMemory::Allocate(kGrowBytes)) {}
+ProtoRingBuffer::~ProtoRingBuffer() = default;
+
+void ProtoRingBuffer::Append(const void* data_void, size_t data_len) {
+  if (failed_)
+    return;
+  const uint8_t* data = static_cast<const uint8_t*>(data_void);
+  PERFETTO_DCHECK(wr_ <= buf_.size());
+  PERFETTO_DCHECK(wr_ >= rd_);
+
+  // If the last call to ReadMessage() consumed all the data in the buffer and
+  // there are no incomplete messages pending, restart from the beginning rather
+  // than keep ringing. This is the most common case.
+  if (rd_ == wr_)
+    rd_ = wr_ = 0;
+
+  // The caller is expected to always issue a ReadMessage() after each Append().
+  PERFETTO_CHECK(!fastpath_.valid());
+  if (rd_ == wr_) {
+    auto msg = TryReadMessage(data, data + data_len);
+    if (msg.valid() && msg.end() == (data + data_len)) {
+      // Fastpath: in many cases, the underlying stream will effectively
+      // preserve the atomicity of messages for most small messages.
+      // In this case we can avoid the extra buf_ roundtrip and just pass a
+      // pointer to |data| + (proto preamble len).
+      // The next call to ReadMessage)= will return |fastpath_|.
+      fastpath_ = std::move(msg);
+      return;
+    }
+  }
+
+  size_t avail = buf_.size() - wr_;
+  if (data_len > avail) {
+    // This whole section should be hit extremely rare.
+
+    // Try first just recompacting the buffer by moving everything to the left.
+    // This can happen if we received "a message and a bit" on each Append call
+    // so we ended pup in a situation like:
+    // buf_: [unused space] [msg1 incomplete]
+    //                      ^rd_             ^wr_
+    //
+    // After recompaction:
+    // buf_: [msg1 incomplete]
+    //       ^rd_             ^wr_
+    uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+    memmove(&buf[0], &buf[rd_], wr_ - rd_);
+    avail += rd_;
+    wr_ -= rd_;
+    rd_ = 0;
+    if (data_len > avail) {
+      // The compaction didn't free up enough space and we need to expand the
+      // ring buffer. Yes, we could have detected this earlier and split the
+      // code paths, rather than first compacting and then realizing it wasn't
+      // sufficient. However, that would make the code harder to reason about,
+      // creating code paths that are nearly never hit, hence making it more
+      // likely to accumulate bugs in future. All this is very rare.
+      size_t new_size = buf_.size();
+      while (data_len > new_size - wr_)
+        new_size += kGrowBytes;
+      if (new_size > kMaxMsgSize * 2) {
+        failed_ = true;
+        return;
+      }
+      auto new_buf = base::PagedMemory::Allocate(new_size);
+      memcpy(new_buf.Get(), buf_.Get(), buf_.size());
+      buf_ = std::move(new_buf);
+      avail = new_size - wr_;
+      // No need to touch rd_ / wr_ cursors.
+    }
+  }
+
+  // Append the received data at the end of the ring buffer.
+  uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+  memcpy(&buf[wr_], data, data_len);
+  wr_ += data_len;
+}
+
+ProtoRingBuffer::Message ProtoRingBuffer::ReadMessage() {
+  if (failed_)
+    return FramingError();
+
+  if (fastpath_.valid()) {
+    // The fastpath can only be hit when the buffer is empty.
+    PERFETTO_CHECK(rd_ == wr_);
+    auto msg = std::move(fastpath_);
+    fastpath_ = Message{};
+    return msg;
+  }
+
+  uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+
+  PERFETTO_DCHECK(rd_ <= wr_);
+  if (rd_ >= wr_)
+    return Message{};  // Completely empty.
+
+  auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
+  if (!msg.valid()) {
+    failed_ = failed_ || msg.fatal_framing_error;
+    return msg;  // Return |msg| because it could be a framing error.
+  }
+
+  // Note: msg.start is > buf[rd_], because it skips the proto preamble.
+  PERFETTO_DCHECK(msg.start > &buf[rd_]);
+  const uint8_t* msg_end = msg.start + msg.len;
+  PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
+  auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
+  rd_ += msg_outer_len;
+  return msg;
+}
+
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/rpc/proto_ring_buffer.h b/src/trace_processor/rpc/proto_ring_buffer.h
new file mode 100644
index 0000000..62934ca
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer.h
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
+#define SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
+
+#include <stdint.h>
+
+#include "perfetto/ext/base/paged_memory.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// This class buffers and tokenizes proto messages used for the TraceProcessor
+// RPC interface (See comments in trace_processor.proto).
+// From a logical level, the RPC is a sequence of protos like this.
+// [ header 1 ] [ payload 1   ]
+// [ header 2 ] [ payload 2  ]
+// [ header 3 ] [ payload 3     ]
+// Where [ header ] is a variable-length sequence of:
+// [ Field ID = 1, type = length-delimited] [ length (varint) ].
+// The RPC pipe is byte-oriented, not message-oriented (like a TCP stream).
+// The pipe is not required to respect the boundaries of each message, it only
+// guarantees that data is not lost or duplicated. The following sequence of
+// inbound events is possible:
+// 1. [ hdr 1 (incomplete) ... ]
+// 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ]
+// 3. [ ...load 3 ]
+//
+// This class maintains inbound requests in a ring buffer.
+// The expected usage is:
+// ring_buf.Append(data, len);
+// for (;;) {
+//   auto msg = ring_buf.ReadMessage();
+//   if (!msg.valid())
+//     break;
+//   Decode(msg);
+// }
+//
+// After each call to Append, the caller is expected to call ReadMessage() until
+// it returns an invalid message (signalling no more messages could be decoded).
+// Note that a single Append can "unblock" > 1 messages, which is why the caller
+// needs to keep calling ReadMessage in a loop.
+//
+// Internal architecture
+// ---------------------
+// Internally this is similar to a ring-buffer, with the caveat that it never
+// wraps, it only expands. Expansions are rare. The deal is that in most cases
+// the read cursor follows very closely the write cursor. For instance, if the
+// uderlying behaves as a dgram socket, after each Append, the read cursor will
+// chase completely the write cursor. Even if the underyling stream is not
+// always atomic, the expectation is that the read cursor will eventually reach
+// the write one within few messages.
+// A visual example, imagine we have four messages: 2it 4will 2be 4fine
+// Visually:
+//
+// Append("2it4wi"): A message and a bit:
+// [ 2it 4wi                     ]
+// ^R       ^W
+//
+// After the ReadMessage(), the 1st message will be read, but not the 2nd.
+// [ 2it 4wi                     ]
+//      ^R ^W
+//
+// Append("ll2be4f")
+// [ 2it 4will 2be 4f            ]
+//      ^R           ^W
+//
+// After the ReadMessage() loop:
+// [ 2it 4will 2be 4f            ]
+//                ^R ^W
+// Append("ine")
+// [ 2it 4will 2be 4fine         ]
+//                ^R    ^W
+//
+// In the next ReadMessage() the R cursor will chase the W cursor. When this
+// happens (very frequent) we can just reset both cursors to 0 and restart.
+// If we are unlucky and get to the end of the buffer, two things happen:
+// 1. We try first to recompact the buffer, moving everything left by R.
+// 2. If still there isn't enough space, we expand the buffer.
+// Given that each message is expected to be at most kMaxMsgSize (64 MB), the
+// expansion is bound at 2 * kMaxMsgSize.
+class ProtoRingBuffer {
+ public:
+  static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024;
+  struct Message {
+    const uint8_t* start = nullptr;
+    uint32_t len = 0;
+    uint32_t field_id = 0;
+    bool fatal_framing_error = false;
+    const uint8_t* end() const { return start + len; }
+    inline bool valid() const { return !!start; }
+  };
+
+  ProtoRingBuffer();
+  ~ProtoRingBuffer();
+  ProtoRingBuffer(const ProtoRingBuffer&) = delete;
+  ProtoRingBuffer& operator=(const ProtoRingBuffer&) = delete;
+
+  // Appends data into the ring buffer, recompacting or resizing it if needed.
+  // Will invaildate the pointers previously handed out.
+  void Append(const void* data, size_t len);
+
+  // If a protobuf message can be read, it returns the boundaries of the message
+  // (without including the preamble) and advances the read cursor.
+  // If no message is avaiable, returns a null range.
+  // The returned pointer is only valid until the next call to Append(), as
+  // that can recompact or resize the underlying buffer.
+  Message ReadMessage();
+
+  // Exposed for testing.
+  size_t capacity() const { return buf_.size(); }
+  size_t avail() const { return buf_.size() - (wr_ - rd_); }
+
+ private:
+  base::PagedMemory buf_;
+  Message fastpath_{};
+  bool failed_ = false;  // Set in case of an unrecoverable framing faiulre.
+  size_t rd_ = 0;        // Offset of the read cursor in |buf_|.
+  size_t wr_ = 0;        // Offset of the write cursor in |buf_|.
+};
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
diff --git a/src/trace_processor/rpc/proto_ring_buffer_unittest.cc b/src/trace_processor/rpc/proto_ring_buffer_unittest.cc
new file mode 100644
index 0000000..0e3944c
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer_unittest.cc
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/proto_ring_buffer.h"
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#include <list>
+#include <ostream>
+#include <random>
+#include <vector>
+
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "test/gtest_and_gmock.h"
+
+using testing::ElementsAre;
+
+namespace perfetto {
+namespace trace_processor {
+
+// For ASSERT_EQ()
+inline bool operator==(const ProtoRingBuffer::Message& a,
+                       const ProtoRingBuffer::Message& b) {
+  if (a.field_id != b.field_id || a.len != b.len || a.valid() != b.valid())
+    return false;
+  if (!a.valid())
+    return true;
+  return memcmp(a.start, b.start, a.len) == 0;
+}
+
+inline std::ostream& operator<<(std::ostream& stream,
+                                const ProtoRingBuffer::Message& msg) {
+  stream << "Message{field_id:" << msg.field_id << ", len:" << msg.len;
+  stream << ", payload: \"";
+  static constexpr uint32_t kTruncLen = 16;
+  for (uint32_t i = 0; i < std::min(msg.len, kTruncLen); i++)
+    stream << static_cast<char>(msg.start[i]);
+  if (msg.len > kTruncLen)
+    stream << "...";
+  stream << "\"}";
+  return stream;
+}
+
+namespace {
+
+constexpr uint32_t kMaxMsgSize = ProtoRingBuffer::kMaxMsgSize;
+
+class ProtoRingBufferTest : public ::testing::Test {
+ public:
+  ProtoRingBuffer::Message MakeProtoMessage(uint32_t field_id,
+                                            uint32_t len,
+                                            bool append = false) {
+    ProtoRingBuffer::Message msg{};
+    namespace proto_utils = protozero::proto_utils;
+    const uint8_t* initial_ptr = last_msg_.data();
+    if (!append)
+      last_msg_.clear();
+    size_t initial_size = last_msg_.size();
+
+    // 20 is an over-estimation of the preamble (fixed by the 2nd resize below).
+    last_msg_.resize(initial_size + len + 20);
+    uint8_t* wptr = &last_msg_[initial_size];
+    auto tag = proto_utils::MakeTagLengthDelimited(field_id);
+    wptr = proto_utils::WriteVarInt(tag, wptr);
+    wptr = proto_utils::WriteVarInt(len, wptr);
+    msg.start = wptr;
+    msg.len = len;
+    msg.field_id = field_id;
+    for (uint32_t i = 0; i < len; i++)
+      *(wptr++) = '0' + ((len + i) % 73);  // 73 prime for more unique patterns.
+
+    PERFETTO_CHECK(wptr <= &last_msg_.back());
+    last_msg_.resize(static_cast<size_t>(wptr - &last_msg_[0]));
+
+    // Vector must not expand, because the returned Mesdage relies on pointer
+    // stability. The TEST_F must reserve enough capacity.
+    if (append)
+      PERFETTO_CHECK(last_msg_.data() == initial_ptr);
+    return msg;
+  }
+
+  std::vector<uint8_t> last_msg_;
+};
+
+// Test that when appending buffers that contain whole messages the ring buffer
+// is skipped.
+TEST_F(ProtoRingBufferTest, Fastpath) {
+  ProtoRingBuffer buf;
+  for (uint32_t i = 0; i < 10; i++) {
+    // Write a whole message that hits the fastpath.
+    auto expected = MakeProtoMessage(/*field_id=*/i + 1, /*len=*/i * 7);
+    buf.Append(last_msg_.data(), last_msg_.size());
+    // Shouln't take any space the buffer because it hits the fastpath.
+    EXPECT_EQ(buf.avail(), buf.capacity());
+    auto actual = buf.ReadMessage();
+    ASSERT_TRUE(actual.valid());
+    EXPECT_EQ(actual.start, expected.start);  // Should point to the same buf.
+    EXPECT_EQ(actual, expected);
+
+    // Now write a message in two fragments. It won't hit the fastpath
+    expected = MakeProtoMessage(/*field_id*/ 1, /*len=*/32);
+    buf.Append(last_msg_.data(), 13);
+    EXPECT_LT(buf.avail(), buf.capacity());
+    EXPECT_FALSE(buf.ReadMessage().valid());
+
+    // Append 2nd fragment.
+    buf.Append(last_msg_.data() + 13, last_msg_.size() - 13);
+    actual = buf.ReadMessage();
+    ASSERT_TRUE(actual.valid());
+    EXPECT_EQ(actual, expected);
+  }
+}
+
+TEST_F(ProtoRingBufferTest, CoalescingStream) {
+  ProtoRingBuffer buf;
+  last_msg_.reserve(1024);
+  std::list<ProtoRingBuffer::Message> expected;
+
+  // Build 6 messages of 100 bytes each (100 does not include preambles).
+  for (uint32_t i = 1; i <= 6; i++)
+    expected.emplace_back(MakeProtoMessage(i, 100, /*append=*/true));
+
+  uint32_t frag_lens[] = {120, 20, 471, 1};
+  uint32_t frag_sum = 0;
+  for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++)
+    frag_sum += frag_lens[i];
+  ASSERT_EQ(frag_sum, last_msg_.size());
+
+  // Append the messages in such a way that each appen either passes a portion
+  // of a message (the 20 ones) or more than a message.
+  uint32_t written = 0;
+  for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++) {
+    buf.Append(&last_msg_[written], frag_lens[i]);
+    written += frag_lens[i];
+    for (;;) {
+      auto msg = buf.ReadMessage();
+      if (!msg.valid())
+        break;
+      ASSERT_FALSE(expected.empty());
+      ASSERT_EQ(expected.front(), msg);
+      expected.pop_front();
+    }
+  }
+  EXPECT_TRUE(expected.empty());
+}
+
+TEST_F(ProtoRingBufferTest, RandomSizes) {
+  ProtoRingBuffer buf;
+  std::minstd_rand0 rnd(0);
+
+  last_msg_.reserve(1024 * 1024 * 64);
+  std::list<ProtoRingBuffer::Message> expected;
+
+  const uint32_t kNumMsg = 100;
+  for (uint32_t i = 0; i < kNumMsg; i++) {
+    uint32_t field_id = static_cast<uint32_t>(1 + (rnd() % 1024u));
+    uint32_t rndval = static_cast<uint32_t>(rnd());
+    uint32_t len = 1 + (rndval % 1024);
+    if ((rndval % 100) < 2) {
+      len *= 10 * 1024;  // 2% of messages will get close to kMaxMsgSize
+    } else if ((rndval % 100) < 20) {
+      len *= 512;  // 18% will be around 500K;
+    }
+    len = std::max(std::min(len, kMaxMsgSize), 1u);
+    expected.push_back(MakeProtoMessage(field_id, len, /*append=*/true));
+  }
+
+  uint32_t total = static_cast<uint32_t>(last_msg_.size());
+  for (uint32_t frag_sum = 0; frag_sum < total;) {
+    uint32_t frag_len = static_cast<uint32_t>(1 + (rnd() % 32768));
+    frag_len = std::min(frag_len, total - frag_sum);
+    buf.Append(&last_msg_[frag_sum], frag_len);
+    frag_sum += frag_len;
+    for (;;) {
+      auto msg = buf.ReadMessage();
+      if (!msg.valid())
+        break;
+      ASSERT_FALSE(expected.empty());
+      ASSERT_EQ(expected.front(), msg);
+      expected.pop_front();
+    }
+  }
+  EXPECT_TRUE(expected.empty());
+}
+
+TEST_F(ProtoRingBufferTest, HandleProtoErrorsGracefully) {
+  ProtoRingBuffer buf;
+
+  // Apppend a partial valid 32 byte message, followed by some invalild
+  // data.
+  auto expected = MakeProtoMessage(1, 32);
+  buf.Append(last_msg_.data(), last_msg_.size() - 1);
+  auto msg = buf.ReadMessage();
+  EXPECT_FALSE(msg.valid());
+  EXPECT_FALSE(msg.fatal_framing_error);
+
+  uint8_t invalid[] = {0x7f, 0x7f, 0x7f, 0x7f};
+  invalid[0] = last_msg_.back();
+  buf.Append(invalid, sizeof(invalid));
+
+  // The first message shoudl be valild
+  msg = buf.ReadMessage();
+  EXPECT_EQ(msg, expected);
+
+  // All the rest should be a framing error.
+  for (int i = 0; i < 3; i++) {
+    msg = buf.ReadMessage();
+    EXPECT_FALSE(msg.valid());
+    EXPECT_TRUE(msg.fatal_framing_error);
+
+    buf.Append(invalid, sizeof(invalid));
+  }
+}
+
+}  // namespace
+}  // namespace trace_processor
+}  // namespace perfetto