TraceProcessor: Add ProtoRingBuffer for RPC pipe
ProtoRingBuffer helps tokenizing messages received on
the (upcoming) RPC pipe, so that the underlying stream
(JS<>Wasm postmessage or HTTP) isn't required to deal
with that.
Bug: 159142289
Test: perfetto_unittests --gtest_filter=ProtoRingBufferTest.*
Change-Id: Ic66051c49598f2667b8b32be487b6ea19b3a0353
diff --git a/Android.bp b/Android.bp
index d954988..ae3b5b3 100644
--- a/Android.bp
+++ b/Android.bp
@@ -7606,6 +7606,7 @@
filegroup {
name: "perfetto_src_trace_processor_rpc_rpc",
srcs: [
+ "src/trace_processor/rpc/proto_ring_buffer.cc",
"src/trace_processor/rpc/query_result_serializer.cc",
"src/trace_processor/rpc/rpc.cc",
],
@@ -7615,6 +7616,7 @@
filegroup {
name: "perfetto_src_trace_processor_rpc_unittests",
srcs: [
+ "src/trace_processor/rpc/proto_ring_buffer_unittest.cc",
"src/trace_processor/rpc/query_result_serializer_unittest.cc",
],
}
diff --git a/BUILD b/BUILD
index 68cead7..7dc8de9 100644
--- a/BUILD
+++ b/BUILD
@@ -973,6 +973,8 @@
filegroup(
name = "src_trace_processor_rpc_rpc",
srcs = [
+ "src/trace_processor/rpc/proto_ring_buffer.cc",
+ "src/trace_processor/rpc/proto_ring_buffer.h",
"src/trace_processor/rpc/query_result_serializer.cc",
"src/trace_processor/rpc/query_result_serializer.h",
"src/trace_processor/rpc/rpc.cc",
diff --git a/src/trace_processor/rpc/BUILD.gn b/src/trace_processor/rpc/BUILD.gn
index c81552c..63cc752 100644
--- a/src/trace_processor/rpc/BUILD.gn
+++ b/src/trace_processor/rpc/BUILD.gn
@@ -23,6 +23,8 @@
# interface) and by the :httpd module for the HTTP interface.
source_set("rpc") {
sources = [
+ "proto_ring_buffer.cc",
+ "proto_ring_buffer.h",
"query_result_serializer.cc",
"query_result_serializer.h",
"rpc.cc",
@@ -41,7 +43,10 @@
perfetto_unittest_source_set("unittests") {
testonly = true
- sources = [ "query_result_serializer_unittest.cc" ]
+ sources = [
+ "proto_ring_buffer_unittest.cc",
+ "query_result_serializer_unittest.cc",
+ ]
deps = [
":rpc",
"..:lib",
@@ -49,6 +54,7 @@
"../../../gn:gtest_and_gmock",
"../../../protos/perfetto/trace_processor:zero",
"../../base",
+ "../../protozero",
]
}
diff --git a/src/trace_processor/rpc/proto_ring_buffer.cc b/src/trace_processor/rpc/proto_ring_buffer.cc
new file mode 100644
index 0000000..3354efe
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer.cc
@@ -0,0 +1,188 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/proto_ring_buffer.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/paged_memory.h"
+#include "perfetto/protozero/proto_utils.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+constexpr size_t kGrowBytes = 128 * 1024;
+
+inline ProtoRingBuffer::Message FramingError() {
+ ProtoRingBuffer::Message msg{};
+ msg.fatal_framing_error = true;
+ return msg;
+}
+
+// Tries to decode a length-delimited proto field from |start|.
+// Returns a valid boundary if the preamble is valid and the length is within
+// |end|, or an invalid message otherwise.
+ProtoRingBuffer::Message TryReadMessage(const uint8_t* start,
+ const uint8_t* end) {
+ namespace proto_utils = protozero::proto_utils;
+ uint64_t field_tag = 0;
+ auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
+ if (start_of_len == start)
+ return ProtoRingBuffer::Message{}; // Not enough data.
+
+ const uint32_t tag = field_tag & 0x07;
+ if (tag !=
+ static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
+ PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
+ return FramingError();
+ }
+
+ uint64_t msg_len = 0;
+ auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
+ if (start_of_msg == start_of_len)
+ return ProtoRingBuffer::Message{}; // Not enough data.
+
+ if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
+ PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
+ msg_len, ProtoRingBuffer::kMaxMsgSize);
+ return FramingError();
+ }
+
+ if (start_of_msg + msg_len > end)
+ return ProtoRingBuffer::Message{}; // Not enough data.
+
+ ProtoRingBuffer::Message msg{};
+ msg.start = start_of_msg;
+ msg.len = static_cast<uint32_t>(msg_len);
+ msg.field_id = static_cast<uint32_t>(field_tag >> 3);
+ return msg;
+}
+
+} // namespace
+
+ProtoRingBuffer::ProtoRingBuffer()
+ : buf_(base::PagedMemory::Allocate(kGrowBytes)) {}
+ProtoRingBuffer::~ProtoRingBuffer() = default;
+
+void ProtoRingBuffer::Append(const void* data_void, size_t data_len) {
+ if (failed_)
+ return;
+ const uint8_t* data = static_cast<const uint8_t*>(data_void);
+ PERFETTO_DCHECK(wr_ <= buf_.size());
+ PERFETTO_DCHECK(wr_ >= rd_);
+
+ // If the last call to ReadMessage() consumed all the data in the buffer and
+ // there are no incomplete messages pending, restart from the beginning rather
+ // than keep ringing. This is the most common case.
+ if (rd_ == wr_)
+ rd_ = wr_ = 0;
+
+ // The caller is expected to always issue a ReadMessage() after each Append().
+ PERFETTO_CHECK(!fastpath_.valid());
+ if (rd_ == wr_) {
+ auto msg = TryReadMessage(data, data + data_len);
+ if (msg.valid() && msg.end() == (data + data_len)) {
+ // Fastpath: in many cases, the underlying stream will effectively
+ // preserve the atomicity of messages for most small messages.
+ // In this case we can avoid the extra buf_ roundtrip and just pass a
+ // pointer to |data| + (proto preamble len).
+ // The next call to ReadMessage)= will return |fastpath_|.
+ fastpath_ = std::move(msg);
+ return;
+ }
+ }
+
+ size_t avail = buf_.size() - wr_;
+ if (data_len > avail) {
+ // This whole section should be hit extremely rare.
+
+ // Try first just recompacting the buffer by moving everything to the left.
+ // This can happen if we received "a message and a bit" on each Append call
+ // so we ended pup in a situation like:
+ // buf_: [unused space] [msg1 incomplete]
+ // ^rd_ ^wr_
+ //
+ // After recompaction:
+ // buf_: [msg1 incomplete]
+ // ^rd_ ^wr_
+ uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+ memmove(&buf[0], &buf[rd_], wr_ - rd_);
+ avail += rd_;
+ wr_ -= rd_;
+ rd_ = 0;
+ if (data_len > avail) {
+ // The compaction didn't free up enough space and we need to expand the
+ // ring buffer. Yes, we could have detected this earlier and split the
+ // code paths, rather than first compacting and then realizing it wasn't
+ // sufficient. However, that would make the code harder to reason about,
+ // creating code paths that are nearly never hit, hence making it more
+ // likely to accumulate bugs in future. All this is very rare.
+ size_t new_size = buf_.size();
+ while (data_len > new_size - wr_)
+ new_size += kGrowBytes;
+ if (new_size > kMaxMsgSize * 2) {
+ failed_ = true;
+ return;
+ }
+ auto new_buf = base::PagedMemory::Allocate(new_size);
+ memcpy(new_buf.Get(), buf_.Get(), buf_.size());
+ buf_ = std::move(new_buf);
+ avail = new_size - wr_;
+ // No need to touch rd_ / wr_ cursors.
+ }
+ }
+
+ // Append the received data at the end of the ring buffer.
+ uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+ memcpy(&buf[wr_], data, data_len);
+ wr_ += data_len;
+}
+
+ProtoRingBuffer::Message ProtoRingBuffer::ReadMessage() {
+ if (failed_)
+ return FramingError();
+
+ if (fastpath_.valid()) {
+ // The fastpath can only be hit when the buffer is empty.
+ PERFETTO_CHECK(rd_ == wr_);
+ auto msg = std::move(fastpath_);
+ fastpath_ = Message{};
+ return msg;
+ }
+
+ uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
+
+ PERFETTO_DCHECK(rd_ <= wr_);
+ if (rd_ >= wr_)
+ return Message{}; // Completely empty.
+
+ auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
+ if (!msg.valid()) {
+ failed_ = failed_ || msg.fatal_framing_error;
+ return msg; // Return |msg| because it could be a framing error.
+ }
+
+ // Note: msg.start is > buf[rd_], because it skips the proto preamble.
+ PERFETTO_DCHECK(msg.start > &buf[rd_]);
+ const uint8_t* msg_end = msg.start + msg.len;
+ PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
+ auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
+ rd_ += msg_outer_len;
+ return msg;
+}
+
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/rpc/proto_ring_buffer.h b/src/trace_processor/rpc/proto_ring_buffer.h
new file mode 100644
index 0000000..62934ca
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer.h
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
+#define SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
+
+#include <stdint.h>
+
+#include "perfetto/ext/base/paged_memory.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// This class buffers and tokenizes proto messages used for the TraceProcessor
+// RPC interface (See comments in trace_processor.proto).
+// From a logical level, the RPC is a sequence of protos like this.
+// [ header 1 ] [ payload 1 ]
+// [ header 2 ] [ payload 2 ]
+// [ header 3 ] [ payload 3 ]
+// Where [ header ] is a variable-length sequence of:
+// [ Field ID = 1, type = length-delimited] [ length (varint) ].
+// The RPC pipe is byte-oriented, not message-oriented (like a TCP stream).
+// The pipe is not required to respect the boundaries of each message, it only
+// guarantees that data is not lost or duplicated. The following sequence of
+// inbound events is possible:
+// 1. [ hdr 1 (incomplete) ... ]
+// 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ]
+// 3. [ ...load 3 ]
+//
+// This class maintains inbound requests in a ring buffer.
+// The expected usage is:
+// ring_buf.Append(data, len);
+// for (;;) {
+// auto msg = ring_buf.ReadMessage();
+// if (!msg.valid())
+// break;
+// Decode(msg);
+// }
+//
+// After each call to Append, the caller is expected to call ReadMessage() until
+// it returns an invalid message (signalling no more messages could be decoded).
+// Note that a single Append can "unblock" > 1 messages, which is why the caller
+// needs to keep calling ReadMessage in a loop.
+//
+// Internal architecture
+// ---------------------
+// Internally this is similar to a ring-buffer, with the caveat that it never
+// wraps, it only expands. Expansions are rare. The deal is that in most cases
+// the read cursor follows very closely the write cursor. For instance, if the
+// uderlying behaves as a dgram socket, after each Append, the read cursor will
+// chase completely the write cursor. Even if the underyling stream is not
+// always atomic, the expectation is that the read cursor will eventually reach
+// the write one within few messages.
+// A visual example, imagine we have four messages: 2it 4will 2be 4fine
+// Visually:
+//
+// Append("2it4wi"): A message and a bit:
+// [ 2it 4wi ]
+// ^R ^W
+//
+// After the ReadMessage(), the 1st message will be read, but not the 2nd.
+// [ 2it 4wi ]
+// ^R ^W
+//
+// Append("ll2be4f")
+// [ 2it 4will 2be 4f ]
+// ^R ^W
+//
+// After the ReadMessage() loop:
+// [ 2it 4will 2be 4f ]
+// ^R ^W
+// Append("ine")
+// [ 2it 4will 2be 4fine ]
+// ^R ^W
+//
+// In the next ReadMessage() the R cursor will chase the W cursor. When this
+// happens (very frequent) we can just reset both cursors to 0 and restart.
+// If we are unlucky and get to the end of the buffer, two things happen:
+// 1. We try first to recompact the buffer, moving everything left by R.
+// 2. If still there isn't enough space, we expand the buffer.
+// Given that each message is expected to be at most kMaxMsgSize (64 MB), the
+// expansion is bound at 2 * kMaxMsgSize.
+class ProtoRingBuffer {
+ public:
+ static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024;
+ struct Message {
+ const uint8_t* start = nullptr;
+ uint32_t len = 0;
+ uint32_t field_id = 0;
+ bool fatal_framing_error = false;
+ const uint8_t* end() const { return start + len; }
+ inline bool valid() const { return !!start; }
+ };
+
+ ProtoRingBuffer();
+ ~ProtoRingBuffer();
+ ProtoRingBuffer(const ProtoRingBuffer&) = delete;
+ ProtoRingBuffer& operator=(const ProtoRingBuffer&) = delete;
+
+ // Appends data into the ring buffer, recompacting or resizing it if needed.
+ // Will invaildate the pointers previously handed out.
+ void Append(const void* data, size_t len);
+
+ // If a protobuf message can be read, it returns the boundaries of the message
+ // (without including the preamble) and advances the read cursor.
+ // If no message is avaiable, returns a null range.
+ // The returned pointer is only valid until the next call to Append(), as
+ // that can recompact or resize the underlying buffer.
+ Message ReadMessage();
+
+ // Exposed for testing.
+ size_t capacity() const { return buf_.size(); }
+ size_t avail() const { return buf_.size() - (wr_ - rd_); }
+
+ private:
+ base::PagedMemory buf_;
+ Message fastpath_{};
+ bool failed_ = false; // Set in case of an unrecoverable framing faiulre.
+ size_t rd_ = 0; // Offset of the read cursor in |buf_|.
+ size_t wr_ = 0; // Offset of the write cursor in |buf_|.
+};
+
+} // namespace trace_processor
+} // namespace perfetto
+
+#endif // SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
diff --git a/src/trace_processor/rpc/proto_ring_buffer_unittest.cc b/src/trace_processor/rpc/proto_ring_buffer_unittest.cc
new file mode 100644
index 0000000..0e3944c
--- /dev/null
+++ b/src/trace_processor/rpc/proto_ring_buffer_unittest.cc
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/proto_ring_buffer.h"
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#include <list>
+#include <ostream>
+#include <random>
+#include <vector>
+
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "test/gtest_and_gmock.h"
+
+using testing::ElementsAre;
+
+namespace perfetto {
+namespace trace_processor {
+
+// For ASSERT_EQ()
+inline bool operator==(const ProtoRingBuffer::Message& a,
+ const ProtoRingBuffer::Message& b) {
+ if (a.field_id != b.field_id || a.len != b.len || a.valid() != b.valid())
+ return false;
+ if (!a.valid())
+ return true;
+ return memcmp(a.start, b.start, a.len) == 0;
+}
+
+inline std::ostream& operator<<(std::ostream& stream,
+ const ProtoRingBuffer::Message& msg) {
+ stream << "Message{field_id:" << msg.field_id << ", len:" << msg.len;
+ stream << ", payload: \"";
+ static constexpr uint32_t kTruncLen = 16;
+ for (uint32_t i = 0; i < std::min(msg.len, kTruncLen); i++)
+ stream << static_cast<char>(msg.start[i]);
+ if (msg.len > kTruncLen)
+ stream << "...";
+ stream << "\"}";
+ return stream;
+}
+
+namespace {
+
+constexpr uint32_t kMaxMsgSize = ProtoRingBuffer::kMaxMsgSize;
+
+class ProtoRingBufferTest : public ::testing::Test {
+ public:
+ ProtoRingBuffer::Message MakeProtoMessage(uint32_t field_id,
+ uint32_t len,
+ bool append = false) {
+ ProtoRingBuffer::Message msg{};
+ namespace proto_utils = protozero::proto_utils;
+ const uint8_t* initial_ptr = last_msg_.data();
+ if (!append)
+ last_msg_.clear();
+ size_t initial_size = last_msg_.size();
+
+ // 20 is an over-estimation of the preamble (fixed by the 2nd resize below).
+ last_msg_.resize(initial_size + len + 20);
+ uint8_t* wptr = &last_msg_[initial_size];
+ auto tag = proto_utils::MakeTagLengthDelimited(field_id);
+ wptr = proto_utils::WriteVarInt(tag, wptr);
+ wptr = proto_utils::WriteVarInt(len, wptr);
+ msg.start = wptr;
+ msg.len = len;
+ msg.field_id = field_id;
+ for (uint32_t i = 0; i < len; i++)
+ *(wptr++) = '0' + ((len + i) % 73); // 73 prime for more unique patterns.
+
+ PERFETTO_CHECK(wptr <= &last_msg_.back());
+ last_msg_.resize(static_cast<size_t>(wptr - &last_msg_[0]));
+
+ // Vector must not expand, because the returned Mesdage relies on pointer
+ // stability. The TEST_F must reserve enough capacity.
+ if (append)
+ PERFETTO_CHECK(last_msg_.data() == initial_ptr);
+ return msg;
+ }
+
+ std::vector<uint8_t> last_msg_;
+};
+
+// Test that when appending buffers that contain whole messages the ring buffer
+// is skipped.
+TEST_F(ProtoRingBufferTest, Fastpath) {
+ ProtoRingBuffer buf;
+ for (uint32_t i = 0; i < 10; i++) {
+ // Write a whole message that hits the fastpath.
+ auto expected = MakeProtoMessage(/*field_id=*/i + 1, /*len=*/i * 7);
+ buf.Append(last_msg_.data(), last_msg_.size());
+ // Shouln't take any space the buffer because it hits the fastpath.
+ EXPECT_EQ(buf.avail(), buf.capacity());
+ auto actual = buf.ReadMessage();
+ ASSERT_TRUE(actual.valid());
+ EXPECT_EQ(actual.start, expected.start); // Should point to the same buf.
+ EXPECT_EQ(actual, expected);
+
+ // Now write a message in two fragments. It won't hit the fastpath
+ expected = MakeProtoMessage(/*field_id*/ 1, /*len=*/32);
+ buf.Append(last_msg_.data(), 13);
+ EXPECT_LT(buf.avail(), buf.capacity());
+ EXPECT_FALSE(buf.ReadMessage().valid());
+
+ // Append 2nd fragment.
+ buf.Append(last_msg_.data() + 13, last_msg_.size() - 13);
+ actual = buf.ReadMessage();
+ ASSERT_TRUE(actual.valid());
+ EXPECT_EQ(actual, expected);
+ }
+}
+
+TEST_F(ProtoRingBufferTest, CoalescingStream) {
+ ProtoRingBuffer buf;
+ last_msg_.reserve(1024);
+ std::list<ProtoRingBuffer::Message> expected;
+
+ // Build 6 messages of 100 bytes each (100 does not include preambles).
+ for (uint32_t i = 1; i <= 6; i++)
+ expected.emplace_back(MakeProtoMessage(i, 100, /*append=*/true));
+
+ uint32_t frag_lens[] = {120, 20, 471, 1};
+ uint32_t frag_sum = 0;
+ for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++)
+ frag_sum += frag_lens[i];
+ ASSERT_EQ(frag_sum, last_msg_.size());
+
+ // Append the messages in such a way that each appen either passes a portion
+ // of a message (the 20 ones) or more than a message.
+ uint32_t written = 0;
+ for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++) {
+ buf.Append(&last_msg_[written], frag_lens[i]);
+ written += frag_lens[i];
+ for (;;) {
+ auto msg = buf.ReadMessage();
+ if (!msg.valid())
+ break;
+ ASSERT_FALSE(expected.empty());
+ ASSERT_EQ(expected.front(), msg);
+ expected.pop_front();
+ }
+ }
+ EXPECT_TRUE(expected.empty());
+}
+
+TEST_F(ProtoRingBufferTest, RandomSizes) {
+ ProtoRingBuffer buf;
+ std::minstd_rand0 rnd(0);
+
+ last_msg_.reserve(1024 * 1024 * 64);
+ std::list<ProtoRingBuffer::Message> expected;
+
+ const uint32_t kNumMsg = 100;
+ for (uint32_t i = 0; i < kNumMsg; i++) {
+ uint32_t field_id = static_cast<uint32_t>(1 + (rnd() % 1024u));
+ uint32_t rndval = static_cast<uint32_t>(rnd());
+ uint32_t len = 1 + (rndval % 1024);
+ if ((rndval % 100) < 2) {
+ len *= 10 * 1024; // 2% of messages will get close to kMaxMsgSize
+ } else if ((rndval % 100) < 20) {
+ len *= 512; // 18% will be around 500K;
+ }
+ len = std::max(std::min(len, kMaxMsgSize), 1u);
+ expected.push_back(MakeProtoMessage(field_id, len, /*append=*/true));
+ }
+
+ uint32_t total = static_cast<uint32_t>(last_msg_.size());
+ for (uint32_t frag_sum = 0; frag_sum < total;) {
+ uint32_t frag_len = static_cast<uint32_t>(1 + (rnd() % 32768));
+ frag_len = std::min(frag_len, total - frag_sum);
+ buf.Append(&last_msg_[frag_sum], frag_len);
+ frag_sum += frag_len;
+ for (;;) {
+ auto msg = buf.ReadMessage();
+ if (!msg.valid())
+ break;
+ ASSERT_FALSE(expected.empty());
+ ASSERT_EQ(expected.front(), msg);
+ expected.pop_front();
+ }
+ }
+ EXPECT_TRUE(expected.empty());
+}
+
+TEST_F(ProtoRingBufferTest, HandleProtoErrorsGracefully) {
+ ProtoRingBuffer buf;
+
+ // Apppend a partial valid 32 byte message, followed by some invalild
+ // data.
+ auto expected = MakeProtoMessage(1, 32);
+ buf.Append(last_msg_.data(), last_msg_.size() - 1);
+ auto msg = buf.ReadMessage();
+ EXPECT_FALSE(msg.valid());
+ EXPECT_FALSE(msg.fatal_framing_error);
+
+ uint8_t invalid[] = {0x7f, 0x7f, 0x7f, 0x7f};
+ invalid[0] = last_msg_.back();
+ buf.Append(invalid, sizeof(invalid));
+
+ // The first message shoudl be valild
+ msg = buf.ReadMessage();
+ EXPECT_EQ(msg, expected);
+
+ // All the rest should be a framing error.
+ for (int i = 0; i < 3; i++) {
+ msg = buf.ReadMessage();
+ EXPECT_FALSE(msg.valid());
+ EXPECT_TRUE(msg.fatal_framing_error);
+
+ buf.Append(invalid, sizeof(invalid));
+ }
+}
+
+} // namespace
+} // namespace trace_processor
+} // namespace perfetto