tp: extract out tokenizing logic from ProtoTraceTokenizer

This CL does three things:
1. Renames ProtoTraceTokenizer -> ProtoTraceReader to better reflect its
   true purpose.
2. Extracts out all the actual tokenization from ProtoTraceReader into a
   new ProtoTraceTokenizer class which can be reused. This includes
   handling any decompression of compressed packets.
3. Updates all usage of ProtoTraceTokenizer to ProtoTraceReader.

This is needed to support handling gzip traces in DecompressTrace
without duplicating really subtle logic.

Bug: 167975440
Change-Id: I4d14aa011663b9cbbac4672a63f18f9c08e7201d
diff --git a/Android.bp b/Android.bp
index 13ad6d5..6b00865 100644
--- a/Android.bp
+++ b/Android.bp
@@ -6971,6 +6971,7 @@
     "src/trace_processor/importers/proto/profiler_util.cc",
     "src/trace_processor/importers/proto/proto_importer_module.cc",
     "src/trace_processor/importers/proto/proto_trace_parser.cc",
+    "src/trace_processor/importers/proto/proto_trace_reader.cc",
     "src/trace_processor/importers/proto/proto_trace_tokenizer.cc",
     "src/trace_processor/importers/proto/stack_profile_tracker.cc",
     "src/trace_processor/importers/proto/track_event_module.cc",
diff --git a/BUILD b/BUILD
index 9200701..6efdde8 100644
--- a/BUILD
+++ b/BUILD
@@ -1092,6 +1092,8 @@
         "src/trace_processor/importers/proto/proto_incremental_state.h",
         "src/trace_processor/importers/proto/proto_trace_parser.cc",
         "src/trace_processor/importers/proto/proto_trace_parser.h",
+        "src/trace_processor/importers/proto/proto_trace_reader.cc",
+        "src/trace_processor/importers/proto/proto_trace_reader.h",
         "src/trace_processor/importers/proto/proto_trace_tokenizer.cc",
         "src/trace_processor/importers/proto/proto_trace_tokenizer.h",
         "src/trace_processor/importers/proto/stack_profile_tracker.cc",
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index f58fa5c..671557a 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -105,6 +105,8 @@
     "importers/proto/proto_incremental_state.h",
     "importers/proto/proto_trace_parser.cc",
     "importers/proto/proto_trace_parser.h",
+    "importers/proto/proto_trace_reader.cc",
+    "importers/proto/proto_trace_reader.h",
     "importers/proto/proto_trace_tokenizer.cc",
     "importers/proto/proto_trace_tokenizer.h",
     "importers/proto/stack_profile_tracker.cc",
diff --git a/src/trace_processor/chunked_trace_reader.h b/src/trace_processor/chunked_trace_reader.h
index 0d487dc..8dbdd4f 100644
--- a/src/trace_processor/chunked_trace_reader.h
+++ b/src/trace_processor/chunked_trace_reader.h
@@ -29,7 +29,7 @@
 namespace trace_processor {
 
 // Base interface for first stage of parsing pipeline
-// (JsonTraceParser, ProtoTraceTokenizer).
+// (JsonTraceParser, ProtoTraceReader).
 class ChunkedTraceReader {
  public:
   virtual ~ChunkedTraceReader();
diff --git a/src/trace_processor/forwarding_trace_parser.cc b/src/trace_processor/forwarding_trace_parser.cc
index de61850..153c842 100644
--- a/src/trace_processor/forwarding_trace_parser.cc
+++ b/src/trace_processor/forwarding_trace_parser.cc
@@ -21,7 +21,7 @@
 #include "src/trace_processor/importers/common/process_tracker.h"
 #include "src/trace_processor/importers/ninja/ninja_log_parser.h"
 #include "src/trace_processor/importers/proto/proto_trace_parser.h"
-#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
 #include "src/trace_processor/trace_sorter.h"
 
 namespace perfetto {
@@ -81,7 +81,7 @@
         PERFETTO_DLOG("Proto trace detected");
         // This will be reduced once we read the trace config and we see flush
         // period being set.
-        reader_.reset(new ProtoTraceTokenizer(context_));
+        reader_.reset(new ProtoTraceReader(context_));
         context_->sorter.reset(new TraceSorter(
             std::unique_ptr<TraceParser>(new ProtoTraceParser(context_)),
             kMaxWindowSize));
diff --git a/src/trace_processor/importers/common/track_tracker.h b/src/trace_processor/importers/common/track_tracker.h
index 0d3189f..b98a38a 100644
--- a/src/trace_processor/importers/common/track_tracker.h
+++ b/src/trace_processor/importers/common/track_tracker.h
@@ -184,7 +184,7 @@
       uint32_t packet_sequence_id,
       int64_t value);
 
-  // Called by ProtoTraceTokenizer whenever incremental state is cleared on a
+  // Called by ProtoTraceReader whenever incremental state is cleared on a
   // packet sequence. Resets counter values for any incremental counters of
   // the sequence identified by |packet_sequence_id|.
   void OnIncrementalStateCleared(uint32_t packet_sequence_id);
diff --git a/src/trace_processor/importers/proto/proto_importer_module.h b/src/trace_processor/importers/proto/proto_importer_module.h
index 909eb14..74cc958 100644
--- a/src/trace_processor/importers/proto/proto_importer_module.h
+++ b/src/trace_processor/importers/proto/proto_importer_module.h
@@ -36,7 +36,7 @@
 struct TimestampedTracePiece;
 class TraceProcessorContext;
 
-// This file contains a base class for ProtoTraceTokenizer/Parser modules.
+// This file contains a base class for ProtoTraceReader/Parser modules.
 // A module implements support for a subset of features of the TracePacket
 // proto format.
 // To add and integrate a new module:
@@ -98,7 +98,7 @@
 
   virtual ~ProtoImporterModule();
 
-  // Called by ProtoTraceTokenizer during the tokenization stage, i.e. before
+  // Called by ProtoTraceReader during the tokenization stage, i.e. before
   // sorting. It's called for each TracePacket that contains fields for which
   // the module was registered. If this returns a result other than
   // ModuleResult::Ignored(), tokenization of the packet will be aborted after
diff --git a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
index 2663455..ed2b7d1 100644
--- a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
 
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/string_view.h"
@@ -243,7 +243,7 @@
     std::vector<uint8_t> trace_bytes = trace_.SerializeAsArray();
     std::unique_ptr<uint8_t[]> raw_trace(new uint8_t[trace_bytes.size()]);
     memcpy(raw_trace.get(), trace_bytes.data(), trace_bytes.size());
-    context_.chunk_reader.reset(new ProtoTraceTokenizer(&context_));
+    context_.chunk_reader.reset(new ProtoTraceReader(&context_));
     auto status =
         context_.chunk_reader->Parse(std::move(raw_trace), trace_bytes.size());
 
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.cc b/src/trace_processor/importers/proto/proto_trace_reader.cc
new file mode 100644
index 0000000..5e12987
--- /dev/null
+++ b/src/trace_processor/importers/proto/proto_trace_reader.cc
@@ -0,0 +1,372 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
+
+#include <string>
+
+#include "perfetto/base/build_config.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/string_view.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/protozero/proto_decoder.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/trace_processor/status.h"
+#include "src/trace_processor/importers/common/clock_tracker.h"
+#include "src/trace_processor/importers/common/event_tracker.h"
+#include "src/trace_processor/importers/common/track_tracker.h"
+#include "src/trace_processor/importers/ftrace/ftrace_module.h"
+#include "src/trace_processor/importers/gzip/gzip_utils.h"
+#include "src/trace_processor/importers/proto/args_table_utils.h"
+#include "src/trace_processor/importers/proto/metadata_tracker.h"
+#include "src/trace_processor/importers/proto/packet_sequence_state.h"
+#include "src/trace_processor/importers/proto/proto_incremental_state.h"
+#include "src/trace_processor/storage/stats.h"
+#include "src/trace_processor/storage/trace_storage.h"
+#include "src/trace_processor/trace_sorter.h"
+
+#include "protos/perfetto/common/builtin_clock.pbzero.h"
+#include "protos/perfetto/config/trace_config.pbzero.h"
+#include "protos/perfetto/trace/clock_snapshot.pbzero.h"
+#include "protos/perfetto/trace/extension_descriptor.pbzero.h"
+#include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
+#include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
+#include "protos/perfetto/trace/trace.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
+    : context_(ctx) {}
+ProtoTraceReader::~ProtoTraceReader() = default;
+
+util::Status ProtoTraceReader::Parse(std::unique_ptr<uint8_t[]> owned_buf,
+                                     size_t size) {
+  return tokenizer_.Tokenize(
+      std::move(owned_buf), size,
+      [this](TraceBlobView packet) { return ParsePacket(std::move(packet)); });
+}
+
+util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
+  protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
+                                                       descriptor.size);
+
+  auto extension = decoder.extension_set();
+  return context_->proto_to_args_table_->AddProtoFileDescriptor(extension.data,
+                                                                extension.size);
+}
+
+util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
+  protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
+  if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
+    return util::ErrStatus(
+        "Failed to parse proto packet fully; the trace is probably corrupt.");
+  }
+
+  // Any compressed packets should have been handled by the tokenizer.
+  PERFETTO_CHECK(!decoder.has_compressed_packets());
+
+  const uint32_t seq_id = decoder.trusted_packet_sequence_id();
+  auto* state = GetIncrementalStateForPacketSequence(seq_id);
+
+  uint32_t sequence_flags = decoder.sequence_flags();
+
+  if (decoder.incremental_state_cleared() ||
+      sequence_flags &
+          protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
+    HandleIncrementalStateCleared(decoder);
+  } else if (decoder.previous_packet_dropped()) {
+    HandlePreviousPacketDropped(decoder);
+  }
+
+  // It is important that we parse defaults before parsing other fields such as
+  // the timestamp, since the defaults could affect them.
+  if (decoder.has_trace_packet_defaults()) {
+    auto field = decoder.trace_packet_defaults();
+    const size_t offset = packet.offset_of(field.data);
+    ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
+  }
+
+  if (decoder.has_interned_data()) {
+    auto field = decoder.interned_data();
+    const size_t offset = packet.offset_of(field.data);
+    ParseInternedData(decoder, packet.slice(offset, field.size));
+  }
+
+  if (decoder.has_clock_snapshot()) {
+    return ParseClockSnapshot(decoder.clock_snapshot(),
+                              decoder.trusted_packet_sequence_id());
+  }
+
+  if (decoder.has_service_event()) {
+    PERFETTO_DCHECK(decoder.has_timestamp());
+    int64_t ts = static_cast<int64_t>(decoder.timestamp());
+    return ParseServiceEvent(ts, decoder.service_event());
+  }
+
+  if (decoder.has_extension_descriptor()) {
+    return ParseExtensionDescriptor(decoder.extension_descriptor());
+  }
+
+  if (decoder.sequence_flags() &
+      protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
+    if (!seq_id) {
+      return util::ErrStatus(
+          "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
+          "TraceWriter's sequence_id is zero (the service is "
+          "probably too old)");
+    }
+
+    if (!state->IsIncrementalStateValid()) {
+      context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
+      return util::OkStatus();
+    }
+  }
+
+  protos::pbzero::TracePacketDefaults::Decoder* defaults =
+      state->current_generation()->GetTracePacketDefaults();
+
+  int64_t timestamp;
+  if (decoder.has_timestamp()) {
+    timestamp = static_cast<int64_t>(decoder.timestamp());
+
+    uint32_t timestamp_clock_id =
+        decoder.has_timestamp_clock_id()
+            ? decoder.timestamp_clock_id()
+            : (defaults ? defaults->timestamp_clock_id() : 0);
+
+    if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
+        (!timestamp_clock_id ||
+         timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
+      // Chrome event timestamps are in MONOTONIC domain, but may occur in
+      // traces where (a) no clock snapshots exist or (b) no clock_id is
+      // specified for their timestamps. Adjust to trace time if we have a clock
+      // snapshot.
+      // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
+      // chrome and then remove this.
+      auto trace_ts = context_->clock_tracker->ToTraceTime(
+          protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
+      if (trace_ts.has_value())
+        timestamp = trace_ts.value();
+    } else if (timestamp_clock_id) {
+      // If the TracePacket specifies a non-zero clock-id, translate the
+      // timestamp into the trace-time clock domain.
+      ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
+      bool is_seq_scoped =
+          ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
+      if (is_seq_scoped) {
+        if (!seq_id) {
+          return util::ErrStatus(
+              "TracePacket specified a sequence-local clock id (%" PRIu32
+              ") but the TraceWriter's sequence_id is zero (the service is "
+              "probably too old)",
+              timestamp_clock_id);
+        }
+        converted_clock_id =
+            ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
+      }
+      auto trace_ts =
+          context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
+      if (!trace_ts.has_value()) {
+        // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
+        static const char seq_extra_err[] =
+            " Because the clock id is sequence-scoped, the ClockSnapshot must "
+            "be emitted on the same TraceWriter sequence of the packet that "
+            "refers to that clock id.";
+        return util::ErrStatus(
+            "Failed to convert TracePacket's timestamp from clock_id=%" PRIu32
+            " seq_id=%" PRIu32
+            ". This is usually due to the lack of a prior ClockSnapshot "
+            "proto.%s",
+            timestamp_clock_id, seq_id, is_seq_scoped ? seq_extra_err : "");
+      }
+      timestamp = trace_ts.value();
+    }
+  } else {
+    timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
+  }
+  latest_timestamp_ = std::max(timestamp, latest_timestamp_);
+
+  auto& modules = context_->modules_by_field;
+  for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
+    if (modules[field_id] && decoder.Get(field_id).valid()) {
+      ModuleResult res = modules[field_id]->TokenizePacket(
+          decoder, &packet, timestamp, state, field_id);
+      if (!res.ignored())
+        return res.ToStatus();
+    }
+  }
+
+  // If we're not forcing a full sort and this is a write_into_file trace, then
+  // use flush_period_ms as an indiciator for how big the sliding window for the
+  // sorter should be.
+  if (!context_->config.force_full_sort && decoder.has_trace_config()) {
+    auto config = decoder.trace_config();
+    protos::pbzero::TraceConfig::Decoder trace_config(config.data, config.size);
+
+    if (trace_config.write_into_file()) {
+      int64_t window_size_ns;
+      if (trace_config.has_flush_period_ms() &&
+          trace_config.flush_period_ms() > 0) {
+        // We use 2x the flush period as a margin of error to allow for any
+        // late flush responses to still be sorted correctly.
+        window_size_ns = static_cast<int64_t>(trace_config.flush_period_ms()) *
+                         2 * 1000 * 1000;
+      } else {
+        constexpr uint64_t kDefaultWindowNs =
+            180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
+        PERFETTO_ELOG(
+            "It is strongly recommended to have flush_period_ms set when "
+            "write_into_file is turned on. You will likely have many dropped "
+            "events because of inability to sort the events correctly.");
+        window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
+      }
+      context_->sorter->SetWindowSizeNs(window_size_ns);
+    }
+  }
+
+  // Use parent data and length because we want to parse this again
+  // later to get the exact type of the packet.
+  context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
+
+  return util::OkStatus();
+}
+
+void ProtoTraceReader::HandleIncrementalStateCleared(
+    const protos::pbzero::TracePacket::Decoder& packet_decoder) {
+  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
+    PERFETTO_ELOG(
+        "incremental_state_cleared without trusted_packet_sequence_id");
+    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
+    return;
+  }
+  GetIncrementalStateForPacketSequence(
+      packet_decoder.trusted_packet_sequence_id())
+      ->OnIncrementalStateCleared();
+  context_->track_tracker->OnIncrementalStateCleared(
+      packet_decoder.trusted_packet_sequence_id());
+}
+
+void ProtoTraceReader::HandlePreviousPacketDropped(
+    const protos::pbzero::TracePacket::Decoder& packet_decoder) {
+  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
+    PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
+    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
+    return;
+  }
+  GetIncrementalStateForPacketSequence(
+      packet_decoder.trusted_packet_sequence_id())
+      ->OnPacketLoss();
+}
+
+void ProtoTraceReader::ParseTracePacketDefaults(
+    const protos::pbzero::TracePacket_Decoder& packet_decoder,
+    TraceBlobView trace_packet_defaults) {
+  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
+    PERFETTO_ELOG(
+        "TracePacketDefaults packet without trusted_packet_sequence_id");
+    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
+    return;
+  }
+
+  auto* state = GetIncrementalStateForPacketSequence(
+      packet_decoder.trusted_packet_sequence_id());
+  state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
+}
+
+void ProtoTraceReader::ParseInternedData(
+    const protos::pbzero::TracePacket::Decoder& packet_decoder,
+    TraceBlobView interned_data) {
+  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
+    PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
+    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
+    return;
+  }
+
+  auto* state = GetIncrementalStateForPacketSequence(
+      packet_decoder.trusted_packet_sequence_id());
+
+  // Don't parse interned data entries until incremental state is valid, because
+  // they could otherwise be associated with the wrong generation in the state.
+  if (!state->IsIncrementalStateValid()) {
+    context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
+    return;
+  }
+
+  // Store references to interned data submessages into the sequence's state.
+  protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
+  for (protozero::Field f = decoder.ReadField(); f.valid();
+       f = decoder.ReadField()) {
+    auto bytes = f.as_bytes();
+    auto offset = interned_data.offset_of(bytes.data);
+    state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
+  }
+}
+
+util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
+                                                  uint32_t seq_id) {
+  std::vector<ClockTracker::ClockValue> clocks;
+  protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
+  if (evt.primary_trace_clock()) {
+    context_->clock_tracker->SetTraceTimeClock(
+        static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
+  }
+  for (auto it = evt.clocks(); it; ++it) {
+    protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
+    ClockTracker::ClockId clock_id = clk.clock_id();
+    if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
+      if (!seq_id) {
+        return util::ErrStatus(
+            "ClockSnapshot packet is specifying a sequence-scoped clock id "
+            "(%" PRIu64 ") but the TracePacket sequence_id is zero",
+            clock_id);
+      }
+      clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
+    }
+    int64_t unit_multiplier_ns =
+        clk.unit_multiplier_ns()
+            ? static_cast<int64_t>(clk.unit_multiplier_ns())
+            : 1;
+    clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
+                        clk.is_incremental());
+  }
+  context_->clock_tracker->AddSnapshot(clocks);
+  return util::OkStatus();
+}
+
+util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
+  protos::pbzero::TracingServiceEvent::Decoder tse(blob);
+  if (tse.tracing_started()) {
+    context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
+                                            Variadic::Integer(ts));
+  }
+  if (tse.tracing_disabled()) {
+    context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
+                                            Variadic::Integer(ts));
+  }
+  if (tse.all_data_sources_started()) {
+    context_->metadata_tracker->SetMetadata(
+        metadata::all_data_source_started_ns, Variadic::Integer(ts));
+  }
+  return util::OkStatus();
+}
+
+void ProtoTraceReader::NotifyEndOfFile() {}
+
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.h b/src/trace_processor/importers/proto/proto_trace_reader.h
new file mode 100644
index 0000000..4091f1a
--- /dev/null
+++ b/src/trace_processor/importers/proto/proto_trace_reader.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_IMPORTERS_PROTO_PROTO_TRACE_READER_H_
+#define SRC_TRACE_PROCESSOR_IMPORTERS_PROTO_PROTO_TRACE_READER_H_
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "src/trace_processor/chunked_trace_reader.h"
+#include "src/trace_processor/importers/proto/proto_incremental_state.h"
+#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/trace_blob_view.h"
+
+namespace protozero {
+struct ConstBytes;
+}
+
+namespace perfetto {
+
+namespace protos {
+namespace pbzero {
+class TracePacket_Decoder;
+}  // namespace pbzero
+}  // namespace protos
+
+namespace trace_processor {
+
+class PacketSequenceState;
+class TraceProcessorContext;
+class TraceSorter;
+class TraceStorage;
+
+// Implementation of ChunkedTraceReader for proto traces. Tokenizes a proto
+// trace into packets, handles parsing of any packets which need to be
+// handled in trace-order and passes the remainder to TraceSorter to sort
+// into timestamp order.
+class ProtoTraceReader : public ChunkedTraceReader {
+ public:
+  // |reader| is the abstract method of getting chunks of size |chunk_size_b|
+  // from a trace file with these chunks parsed into |trace|.
+  explicit ProtoTraceReader(TraceProcessorContext*);
+  ~ProtoTraceReader() override;
+
+  // ChunkedTraceReader implementation.
+  util::Status Parse(std::unique_ptr<uint8_t[]>, size_t size) override;
+  void NotifyEndOfFile() override;
+
+ private:
+  using ConstBytes = protozero::ConstBytes;
+  util::Status ParsePacket(TraceBlobView);
+  util::Status ParseServiceEvent(int64_t ts, ConstBytes);
+  util::Status ParseClockSnapshot(ConstBytes blob, uint32_t seq_id);
+  void HandleIncrementalStateCleared(
+      const protos::pbzero::TracePacket_Decoder&);
+  void HandlePreviousPacketDropped(const protos::pbzero::TracePacket_Decoder&);
+  void ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder&,
+                                TraceBlobView trace_packet_defaults);
+  void ParseInternedData(const protos::pbzero::TracePacket_Decoder&,
+                         TraceBlobView interned_data);
+  PacketSequenceState* GetIncrementalStateForPacketSequence(
+      uint32_t sequence_id) {
+    if (!incremental_state)
+      incremental_state.reset(new ProtoIncrementalState(context_));
+    return incremental_state->GetOrCreateStateForPacketSequence(sequence_id);
+  }
+  util::Status ParseExtensionDescriptor(ConstBytes descriptor);
+
+  TraceProcessorContext* context_;
+
+  ProtoTraceTokenizer tokenizer_;
+
+  // Temporary. Currently trace packets do not have a timestamp, so the
+  // timestamp given is latest_timestamp_.
+  int64_t latest_timestamp_ = 0;
+
+  // Stores incremental state and references to interned data, e.g. for track
+  // event protos.
+  std::unique_ptr<ProtoIncrementalState> incremental_state;
+};
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_IMPORTERS_PROTO_PROTO_TRACE_READER_H_
diff --git a/src/trace_processor/importers/proto/proto_trace_tokenizer.cc b/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
index 7f41b9d..874fb91 100644
--- a/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
+++ b/src/trace_processor/importers/proto/proto_trace_tokenizer.cc
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2018 The Android Open Source Project
+ * Copyright (C) 2020 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,50 +16,13 @@
 
 #include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
 
-#include <string>
-
-#include "perfetto/base/build_config.h"
-#include "perfetto/base/logging.h"
-#include "perfetto/ext/base/optional.h"
-#include "perfetto/ext/base/string_view.h"
 #include "perfetto/ext/base/utils.h"
-#include "perfetto/protozero/proto_decoder.h"
-#include "perfetto/protozero/proto_utils.h"
-#include "perfetto/trace_processor/status.h"
-#include "src/trace_processor/importers/common/clock_tracker.h"
-#include "src/trace_processor/importers/common/event_tracker.h"
-#include "src/trace_processor/importers/common/track_tracker.h"
-#include "src/trace_processor/importers/ftrace/ftrace_module.h"
-#include "src/trace_processor/importers/gzip/gzip_utils.h"
-#include "src/trace_processor/importers/proto/args_table_utils.h"
-#include "src/trace_processor/importers/proto/metadata_tracker.h"
-#include "src/trace_processor/importers/proto/packet_sequence_state.h"
-#include "src/trace_processor/importers/proto/proto_incremental_state.h"
-#include "src/trace_processor/storage/stats.h"
-#include "src/trace_processor/storage/trace_storage.h"
-#include "src/trace_processor/trace_sorter.h"
-
-#include "protos/perfetto/common/builtin_clock.pbzero.h"
-#include "protos/perfetto/config/trace_config.pbzero.h"
-#include "protos/perfetto/trace/clock_snapshot.pbzero.h"
-#include "protos/perfetto/trace/extension_descriptor.pbzero.h"
-#include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
-#include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
-#include "protos/perfetto/trace/trace.pbzero.h"
-#include "protos/perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
 namespace trace_processor {
 
-using protozero::proto_utils::MakeTagLengthDelimited;
-using protozero::proto_utils::ParseVarInt;
-
-namespace {
-
-constexpr uint8_t kTracePacketTag =
-    MakeTagLengthDelimited(protos::pbzero::Trace::kPacketFieldNumber);
-
-TraceBlobView Decompress(GzipDecompressor* decompressor, TraceBlobView input) {
+util::Status ProtoTraceTokenizer::Decompress(TraceBlobView input,
+                                             TraceBlobView* output) {
   PERFETTO_DCHECK(gzip::IsGzipSupported());
 
   uint8_t out[4096];
@@ -68,457 +31,27 @@
   data.reserve(input.length());
 
   // Ensure that the decompressor is able to cope with a new stream of data.
-  decompressor->Reset();
-  decompressor->SetInput(input.data(), input.length());
+  decompressor_.Reset();
+  decompressor_.SetInput(input.data(), input.length());
 
   using ResultCode = GzipDecompressor::ResultCode;
   for (auto ret = ResultCode::kOk; ret != ResultCode::kEof;) {
-    auto res = decompressor->Decompress(out, base::ArraySize(out));
+    auto res = decompressor_.Decompress(out, base::ArraySize(out));
     ret = res.ret;
     if (ret == ResultCode::kError || ret == ResultCode::kNoProgress ||
-        ret == ResultCode::kNeedsMoreInput)
-      return TraceBlobView(nullptr, 0, 0);
+        ret == ResultCode::kNeedsMoreInput) {
+      return util::ErrStatus("Failed to decompress (error code: %d)",
+                             static_cast<int>(ret));
+    }
 
     data.insert(data.end(), out, out + res.bytes_written);
   }
 
-  std::unique_ptr<uint8_t[]> output(new uint8_t[data.size()]);
-  memcpy(output.get(), data.data(), data.size());
-  return TraceBlobView(std::move(output), 0, data.size());
-}
-
-}  // namespace
-
-ProtoTraceTokenizer::ProtoTraceTokenizer(TraceProcessorContext* ctx)
-    : context_(ctx) {}
-ProtoTraceTokenizer::~ProtoTraceTokenizer() = default;
-
-util::Status ProtoTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> owned_buf,
-                                        size_t size) {
-  uint8_t* data = &owned_buf[0];
-  if (!partial_buf_.empty()) {
-    // It takes ~5 bytes for a proto preamble + the varint size.
-    const size_t kHeaderBytes = 5;
-    if (PERFETTO_UNLIKELY(partial_buf_.size() < kHeaderBytes)) {
-      size_t missing_len = std::min(kHeaderBytes - partial_buf_.size(), size);
-      partial_buf_.insert(partial_buf_.end(), &data[0], &data[missing_len]);
-      if (partial_buf_.size() < kHeaderBytes)
-        return util::OkStatus();
-      data += missing_len;
-      size -= missing_len;
-    }
-
-    // At this point we have enough data in |partial_buf_| to read at least the
-    // field header and know the size of the next TracePacket.
-    const uint8_t* pos = &partial_buf_[0];
-    uint8_t proto_field_tag = *pos;
-    uint64_t field_size = 0;
-    const uint8_t* next = ParseVarInt(++pos, &*partial_buf_.end(), &field_size);
-    bool parse_failed = next == pos;
-    pos = next;
-    if (proto_field_tag != kTracePacketTag || field_size == 0 || parse_failed) {
-      return util::ErrStatus(
-          "Failed parsing a TracePacket from the partial buffer");
-    }
-
-    // At this point we know how big the TracePacket is.
-    size_t hdr_size = static_cast<size_t>(pos - &partial_buf_[0]);
-    size_t size_incl_header = static_cast<size_t>(field_size + hdr_size);
-    PERFETTO_DCHECK(size_incl_header > partial_buf_.size());
-
-    // There is a good chance that between the |partial_buf_| and the new |data|
-    // of the current call we have enough bytes to parse a TracePacket.
-    if (partial_buf_.size() + size >= size_incl_header) {
-      // Create a new buffer for the whole TracePacket and copy into that:
-      // 1) The beginning of the TracePacket (including the proto header) from
-      //    the partial buffer.
-      // 2) The rest of the TracePacket from the current |data| buffer (note
-      //    that we might have consumed already a few bytes form |data| earlier
-      //    in this function, hence we need to keep |off| into account).
-      std::unique_ptr<uint8_t[]> buf(new uint8_t[size_incl_header]);
-      memcpy(&buf[0], partial_buf_.data(), partial_buf_.size());
-      // |size_missing| is the number of bytes for the rest of the TracePacket
-      // in |data|.
-      size_t size_missing = size_incl_header - partial_buf_.size();
-      memcpy(&buf[partial_buf_.size()], &data[0], size_missing);
-      data += size_missing;
-      size -= size_missing;
-      partial_buf_.clear();
-      uint8_t* buf_start = &buf[0];  // Note that buf is std::moved below.
-      util::Status status =
-          ParseInternal(std::move(buf), buf_start, size_incl_header);
-      if (PERFETTO_UNLIKELY(!status.ok()))
-        return status;
-    } else {
-      partial_buf_.insert(partial_buf_.end(), data, &data[size]);
-      return util::OkStatus();
-    }
-  }
-  return ParseInternal(std::move(owned_buf), data, size);
-}
-
-util::Status ProtoTraceTokenizer::ParseInternal(
-    std::unique_ptr<uint8_t[]> owned_buf,
-    uint8_t* data,
-    size_t size) {
-  PERFETTO_DCHECK(data >= &owned_buf[0]);
-  const uint8_t* start = &owned_buf[0];
-  const size_t data_off = static_cast<size_t>(data - start);
-  TraceBlobView whole_buf(std::move(owned_buf), data_off, size);
-
-  protos::pbzero::Trace::Decoder decoder(data, size);
-  for (auto it = decoder.packet(); it; ++it) {
-    protozero::ConstBytes packet = *it;
-    size_t field_offset = whole_buf.offset_of(packet.data);
-    util::Status status =
-        ParsePacket(whole_buf.slice(field_offset, packet.size));
-    if (PERFETTO_UNLIKELY(!status.ok()))
-      return status;
-  }
-
-  const size_t bytes_left = decoder.bytes_left();
-  if (bytes_left > 0) {
-    PERFETTO_DCHECK(partial_buf_.empty());
-    partial_buf_.insert(partial_buf_.end(), &data[decoder.read_offset()],
-                        &data[decoder.read_offset() + bytes_left]);
-  }
+  std::unique_ptr<uint8_t[]> out_data(new uint8_t[data.size()]);
+  memcpy(out_data.get(), data.data(), data.size());
+  *output = TraceBlobView(std::move(out_data), 0, data.size());
   return util::OkStatus();
 }
 
-util::Status ProtoTraceTokenizer::ParseExtensionDescriptor(
-    ConstBytes descriptor) {
-  protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
-                                                       descriptor.size);
-
-  auto extension = decoder.extension_set();
-  return context_->proto_to_args_table_->AddProtoFileDescriptor(extension.data,
-                                                                extension.size);
-}
-
-util::Status ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
-  protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
-  if (PERFETTO_UNLIKELY(decoder.bytes_left()))
-    return util::ErrStatus(
-        "Failed to parse proto packet fully; the trace is probably corrupt.");
-
-  const uint32_t seq_id = decoder.trusted_packet_sequence_id();
-  auto* state = GetIncrementalStateForPacketSequence(seq_id);
-
-  uint32_t sequence_flags = decoder.sequence_flags();
-
-  if (decoder.incremental_state_cleared() ||
-      sequence_flags &
-          protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
-    HandleIncrementalStateCleared(decoder);
-  } else if (decoder.previous_packet_dropped()) {
-    HandlePreviousPacketDropped(decoder);
-  }
-
-  // It is important that we parse defaults before parsing other fields such as
-  // the timestamp, since the defaults could affect them.
-  if (decoder.has_trace_packet_defaults()) {
-    auto field = decoder.trace_packet_defaults();
-    const size_t offset = packet.offset_of(field.data);
-    ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
-  }
-
-  if (decoder.has_interned_data()) {
-    auto field = decoder.interned_data();
-    const size_t offset = packet.offset_of(field.data);
-    ParseInternedData(decoder, packet.slice(offset, field.size));
-  }
-
-  if (decoder.has_clock_snapshot()) {
-    return ParseClockSnapshot(decoder.clock_snapshot(),
-                              decoder.trusted_packet_sequence_id());
-  }
-
-  if (decoder.has_service_event()) {
-    PERFETTO_DCHECK(decoder.has_timestamp());
-    int64_t ts = static_cast<int64_t>(decoder.timestamp());
-    return ParseServiceEvent(ts, decoder.service_event());
-  }
-
-  if (decoder.has_extension_descriptor()) {
-    return ParseExtensionDescriptor(decoder.extension_descriptor());
-  }
-
-  if (decoder.sequence_flags() &
-      protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
-    if (!seq_id) {
-      return util::ErrStatus(
-          "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
-          "TraceWriter's sequence_id is zero (the service is "
-          "probably too old)");
-    }
-
-    if (!state->IsIncrementalStateValid()) {
-      context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
-      return util::OkStatus();
-    }
-  }
-
-  protos::pbzero::TracePacketDefaults::Decoder* defaults =
-      state->current_generation()->GetTracePacketDefaults();
-
-  int64_t timestamp;
-  if (decoder.has_timestamp()) {
-    timestamp = static_cast<int64_t>(decoder.timestamp());
-
-    uint32_t timestamp_clock_id =
-        decoder.has_timestamp_clock_id()
-            ? decoder.timestamp_clock_id()
-            : (defaults ? defaults->timestamp_clock_id() : 0);
-
-    if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
-        (!timestamp_clock_id ||
-         timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
-      // Chrome event timestamps are in MONOTONIC domain, but may occur in
-      // traces where (a) no clock snapshots exist or (b) no clock_id is
-      // specified for their timestamps. Adjust to trace time if we have a clock
-      // snapshot.
-      // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
-      // chrome and then remove this.
-      auto trace_ts = context_->clock_tracker->ToTraceTime(
-          protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
-      if (trace_ts.has_value())
-        timestamp = trace_ts.value();
-    } else if (timestamp_clock_id) {
-      // If the TracePacket specifies a non-zero clock-id, translate the
-      // timestamp into the trace-time clock domain.
-      ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
-      bool is_seq_scoped =
-          ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
-      if (is_seq_scoped) {
-        if (!seq_id) {
-          return util::ErrStatus(
-              "TracePacket specified a sequence-local clock id (%" PRIu32
-              ") but the TraceWriter's sequence_id is zero (the service is "
-              "probably too old)",
-              timestamp_clock_id);
-        }
-        converted_clock_id =
-            ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
-      }
-      auto trace_ts =
-          context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
-      if (!trace_ts.has_value()) {
-        // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
-        static const char seq_extra_err[] =
-            " Because the clock id is sequence-scoped, the ClockSnapshot must "
-            "be emitted on the same TraceWriter sequence of the packet that "
-            "refers to that clock id.";
-        return util::ErrStatus(
-            "Failed to convert TracePacket's timestamp from clock_id=%" PRIu32
-            " seq_id=%" PRIu32
-            ". This is usually due to the lack of a prior ClockSnapshot "
-            "proto.%s",
-            timestamp_clock_id, seq_id, is_seq_scoped ? seq_extra_err : "");
-      }
-      timestamp = trace_ts.value();
-    }
-  } else {
-    timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
-  }
-  latest_timestamp_ = std::max(timestamp, latest_timestamp_);
-
-  auto& modules = context_->modules_by_field;
-  for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
-    if (modules[field_id] && decoder.Get(field_id).valid()) {
-      ModuleResult res = modules[field_id]->TokenizePacket(
-          decoder, &packet, timestamp, state, field_id);
-      if (!res.ignored())
-        return res.ToStatus();
-    }
-  }
-
-  if (decoder.has_compressed_packets()) {
-    if (!gzip::IsGzipSupported())
-      return util::Status("Cannot decode compressed packets. Zlib not enabled");
-
-    protozero::ConstBytes field = decoder.compressed_packets();
-    const size_t field_off = packet.offset_of(field.data);
-    TraceBlobView compressed_packets = packet.slice(field_off, field.size);
-    TraceBlobView packets =
-        Decompress(&decompressor_, std::move(compressed_packets));
-
-    const uint8_t* start = packets.data();
-    const uint8_t* end = packets.data() + packets.length();
-    const uint8_t* ptr = start;
-    while ((end - ptr) > 2) {
-      const uint8_t* packet_start = ptr;
-      if (PERFETTO_UNLIKELY(*ptr != kTracePacketTag))
-        return util::ErrStatus("Expected TracePacket tag");
-      uint64_t packet_size = 0;
-      ptr = ParseVarInt(++ptr, end, &packet_size);
-      size_t packet_offset = static_cast<size_t>(ptr - start);
-      ptr += packet_size;
-      if (PERFETTO_UNLIKELY((ptr - packet_start) < 2 || ptr > end))
-        return util::ErrStatus("Invalid packet size");
-      util::Status status = ParsePacket(
-          packets.slice(packet_offset, static_cast<size_t>(packet_size)));
-      if (PERFETTO_UNLIKELY(!status.ok()))
-        return status;
-    }
-    return util::OkStatus();
-  }
-
-  // If we're not forcing a full sort and this is a write_into_file trace, then
-  // use flush_period_ms as an indiciator for how big the sliding window for the
-  // sorter should be.
-  if (!context_->config.force_full_sort && decoder.has_trace_config()) {
-    auto config = decoder.trace_config();
-    protos::pbzero::TraceConfig::Decoder trace_config(config.data, config.size);
-
-    if (trace_config.write_into_file()) {
-      int64_t window_size_ns;
-      if (trace_config.has_flush_period_ms() &&
-          trace_config.flush_period_ms() > 0) {
-        // We use 2x the flush period as a margin of error to allow for any
-        // late flush responses to still be sorted correctly.
-        window_size_ns = static_cast<int64_t>(trace_config.flush_period_ms()) *
-                         2 * 1000 * 1000;
-      } else {
-        constexpr uint64_t kDefaultWindowNs =
-            180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
-        PERFETTO_ELOG(
-            "It is strongly recommended to have flush_period_ms set when "
-            "write_into_file is turned on. You will likely have many dropped "
-            "events because of inability to sort the events correctly.");
-        window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
-      }
-      context_->sorter->SetWindowSizeNs(window_size_ns);
-    }
-  }
-
-  // Use parent data and length because we want to parse this again
-  // later to get the exact type of the packet.
-  context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
-
-  return util::OkStatus();
-}
-
-void ProtoTraceTokenizer::HandleIncrementalStateCleared(
-    const protos::pbzero::TracePacket::Decoder& packet_decoder) {
-  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
-    PERFETTO_ELOG(
-        "incremental_state_cleared without trusted_packet_sequence_id");
-    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
-    return;
-  }
-  GetIncrementalStateForPacketSequence(
-      packet_decoder.trusted_packet_sequence_id())
-      ->OnIncrementalStateCleared();
-  context_->track_tracker->OnIncrementalStateCleared(
-      packet_decoder.trusted_packet_sequence_id());
-}
-
-void ProtoTraceTokenizer::HandlePreviousPacketDropped(
-    const protos::pbzero::TracePacket::Decoder& packet_decoder) {
-  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
-    PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
-    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
-    return;
-  }
-  GetIncrementalStateForPacketSequence(
-      packet_decoder.trusted_packet_sequence_id())
-      ->OnPacketLoss();
-}
-
-void ProtoTraceTokenizer::ParseTracePacketDefaults(
-    const protos::pbzero::TracePacket_Decoder& packet_decoder,
-    TraceBlobView trace_packet_defaults) {
-  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
-    PERFETTO_ELOG(
-        "TracePacketDefaults packet without trusted_packet_sequence_id");
-    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
-    return;
-  }
-
-  auto* state = GetIncrementalStateForPacketSequence(
-      packet_decoder.trusted_packet_sequence_id());
-  state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
-}
-
-void ProtoTraceTokenizer::ParseInternedData(
-    const protos::pbzero::TracePacket::Decoder& packet_decoder,
-    TraceBlobView interned_data) {
-  if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
-    PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
-    context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
-    return;
-  }
-
-  auto* state = GetIncrementalStateForPacketSequence(
-      packet_decoder.trusted_packet_sequence_id());
-
-  // Don't parse interned data entries until incremental state is valid, because
-  // they could otherwise be associated with the wrong generation in the state.
-  if (!state->IsIncrementalStateValid()) {
-    context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
-    return;
-  }
-
-  // Store references to interned data submessages into the sequence's state.
-  protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
-  for (protozero::Field f = decoder.ReadField(); f.valid();
-       f = decoder.ReadField()) {
-    auto bytes = f.as_bytes();
-    auto offset = interned_data.offset_of(bytes.data);
-    state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
-  }
-}
-
-util::Status ProtoTraceTokenizer::ParseClockSnapshot(ConstBytes blob,
-                                                     uint32_t seq_id) {
-  std::vector<ClockTracker::ClockValue> clocks;
-  protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
-  if (evt.primary_trace_clock()) {
-    context_->clock_tracker->SetTraceTimeClock(
-        static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
-  }
-  for (auto it = evt.clocks(); it; ++it) {
-    protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
-    ClockTracker::ClockId clock_id = clk.clock_id();
-    if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
-      if (!seq_id) {
-        return util::ErrStatus(
-            "ClockSnapshot packet is specifying a sequence-scoped clock id "
-            "(%" PRIu64 ") but the TracePacket sequence_id is zero",
-            clock_id);
-      }
-      clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
-    }
-    int64_t unit_multiplier_ns =
-        clk.unit_multiplier_ns()
-            ? static_cast<int64_t>(clk.unit_multiplier_ns())
-            : 1;
-    clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
-                        clk.is_incremental());
-  }
-  context_->clock_tracker->AddSnapshot(clocks);
-  return util::OkStatus();
-}
-
-util::Status ProtoTraceTokenizer::ParseServiceEvent(int64_t ts,
-                                                    ConstBytes blob) {
-  protos::pbzero::TracingServiceEvent::Decoder tse(blob);
-  if (tse.tracing_started()) {
-    context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
-                                            Variadic::Integer(ts));
-  }
-  if (tse.tracing_disabled()) {
-    context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
-                                            Variadic::Integer(ts));
-  }
-  if (tse.all_data_sources_started()) {
-    context_->metadata_tracker->SetMetadata(
-        metadata::all_data_source_started_ns, Variadic::Integer(ts));
-  }
-  return util::OkStatus();
-}
-
-void ProtoTraceTokenizer::NotifyEndOfFile() {}
-
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/importers/proto/proto_trace_tokenizer.h b/src/trace_processor/importers/proto/proto_trace_tokenizer.h
index ba94d75..5039e48 100644
--- a/src/trace_processor/importers/proto/proto_trace_tokenizer.h
+++ b/src/trace_processor/importers/proto/proto_trace_tokenizer.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2018 The Android Open Source Project
+ * Copyright (C) 2020 The Android Open Source Project
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,85 +17,169 @@
 #ifndef SRC_TRACE_PROCESSOR_IMPORTERS_PROTO_PROTO_TRACE_TOKENIZER_H_
 #define SRC_TRACE_PROCESSOR_IMPORTERS_PROTO_PROTO_TRACE_TOKENIZER_H_
 
-#include <stdint.h>
-
-#include <memory>
 #include <vector>
 
-#include "src/trace_processor/chunked_trace_reader.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/trace_processor/status.h"
 #include "src/trace_processor/importers/gzip/gzip_utils.h"
-#include "src/trace_processor/importers/proto/proto_incremental_state.h"
 #include "src/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/util/status_macros.h"
 
-namespace protozero {
-struct ConstBytes;
-}
+#include "protos/perfetto/trace/trace.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
-
-namespace protos {
-namespace pbzero {
-class TracePacket_Decoder;
-}  // namespace pbzero
-}  // namespace protos
-
 namespace trace_processor {
 
-class PacketSequenceState;
-class TraceProcessorContext;
-class TraceSorter;
-class TraceStorage;
-
 // Reads a protobuf trace in chunks and extracts boundaries of trace packets
 // (or subfields, for the case of ftrace) with their timestamps.
-class ProtoTraceTokenizer : public ChunkedTraceReader {
+class ProtoTraceTokenizer {
  public:
-  // |reader| is the abstract method of getting chunks of size |chunk_size_b|
-  // from a trace file with these chunks parsed into |trace|.
-  explicit ProtoTraceTokenizer(TraceProcessorContext*);
-  ~ProtoTraceTokenizer() override;
+  template <typename Callback = util::Status(TraceBlobView)>
+  util::Status Tokenize(std::unique_ptr<uint8_t[]> owned_buf,
+                        size_t size,
+                        Callback callback) {
+    uint8_t* data = &owned_buf[0];
+    if (!partial_buf_.empty()) {
+      // It takes ~5 bytes for a proto preamble + the varint size.
+      const size_t kHeaderBytes = 5;
+      if (PERFETTO_UNLIKELY(partial_buf_.size() < kHeaderBytes)) {
+        size_t missing_len = std::min(kHeaderBytes - partial_buf_.size(), size);
+        partial_buf_.insert(partial_buf_.end(), &data[0], &data[missing_len]);
+        if (partial_buf_.size() < kHeaderBytes)
+          return util::OkStatus();
+        data += missing_len;
+        size -= missing_len;
+      }
 
-  // ChunkedTraceReader implementation.
-  util::Status Parse(std::unique_ptr<uint8_t[]>, size_t size) override;
-  void NotifyEndOfFile() override;
+      // At this point we have enough data in |partial_buf_| to read at least
+      // the field header and know the size of the next TracePacket.
+      const uint8_t* pos = &partial_buf_[0];
+      uint8_t proto_field_tag = *pos;
+      uint64_t field_size = 0;
+      const uint8_t* next = protozero::proto_utils::ParseVarInt(
+          ++pos, &*partial_buf_.end(), &field_size);
+      bool parse_failed = next == pos;
+      pos = next;
+      if (proto_field_tag != kTracePacketTag || field_size == 0 ||
+          parse_failed) {
+        return util::ErrStatus(
+            "Failed parsing a TracePacket from the partial buffer");
+      }
+
+      // At this point we know how big the TracePacket is.
+      size_t hdr_size = static_cast<size_t>(pos - &partial_buf_[0]);
+      size_t size_incl_header = static_cast<size_t>(field_size + hdr_size);
+      PERFETTO_DCHECK(size_incl_header > partial_buf_.size());
+
+      // There is a good chance that between the |partial_buf_| and the new
+      // |data| of the current call we have enough bytes to parse a TracePacket.
+      if (partial_buf_.size() + size >= size_incl_header) {
+        // Create a new buffer for the whole TracePacket and copy into that:
+        // 1) The beginning of the TracePacket (including the proto header) from
+        //    the partial buffer.
+        // 2) The rest of the TracePacket from the current |data| buffer (note
+        //    that we might have consumed already a few bytes form |data|
+        //    earlier in this function, hence we need to keep |off| into
+        //    account).
+        std::unique_ptr<uint8_t[]> buf(new uint8_t[size_incl_header]);
+        memcpy(&buf[0], partial_buf_.data(), partial_buf_.size());
+        // |size_missing| is the number of bytes for the rest of the TracePacket
+        // in |data|.
+        size_t size_missing = size_incl_header - partial_buf_.size();
+        memcpy(&buf[partial_buf_.size()], &data[0], size_missing);
+        data += size_missing;
+        size -= size_missing;
+        partial_buf_.clear();
+        uint8_t* buf_start = &buf[0];  // Note that buf is std::moved below.
+        RETURN_IF_ERROR(ParseInternal(std::move(buf), buf_start,
+                                      size_incl_header, callback));
+      } else {
+        partial_buf_.insert(partial_buf_.end(), data, &data[size]);
+        return util::OkStatus();
+      }
+    }
+    return ParseInternal(std::move(owned_buf), data, size, callback);
+  }
 
  private:
-  using ConstBytes = protozero::ConstBytes;
+  static constexpr uint8_t kTracePacketTag =
+      protozero::proto_utils::MakeTagLengthDelimited(
+          protos::pbzero::Trace::kPacketFieldNumber);
+
+  template <typename Callback = util::Status(TraceBlobView)>
   util::Status ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,
                              uint8_t* data,
-                             size_t size);
-  util::Status ParsePacket(TraceBlobView);
-  util::Status ParseServiceEvent(int64_t ts, ConstBytes);
-  util::Status ParseClockSnapshot(ConstBytes blob, uint32_t seq_id);
-  void HandleIncrementalStateCleared(
-      const protos::pbzero::TracePacket_Decoder&);
-  void HandlePreviousPacketDropped(const protos::pbzero::TracePacket_Decoder&);
-  void ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder&,
-                                TraceBlobView trace_packet_defaults);
-  void ParseInternedData(const protos::pbzero::TracePacket_Decoder&,
-                         TraceBlobView interned_data);
-  PacketSequenceState* GetIncrementalStateForPacketSequence(
-      uint32_t sequence_id) {
-    if (!incremental_state)
-      incremental_state.reset(new ProtoIncrementalState(context_));
-    return incremental_state->GetOrCreateStateForPacketSequence(sequence_id);
-  }
-  util::Status ParseExtensionDescriptor(ConstBytes descriptor);
+                             size_t size,
+                             Callback callback) {
+    PERFETTO_DCHECK(data >= &owned_buf[0]);
+    const uint8_t* start = &owned_buf[0];
+    const size_t data_off = static_cast<size_t>(data - start);
+    TraceBlobView whole_buf(std::move(owned_buf), data_off, size);
 
-  TraceProcessorContext* context_;
+    protos::pbzero::Trace::Decoder decoder(data, size);
+    for (auto it = decoder.packet(); it; ++it) {
+      protozero::ConstBytes packet = *it;
+      size_t field_offset = whole_buf.offset_of(packet.data);
+      TraceBlobView sliced = whole_buf.slice(field_offset, packet.size);
+      RETURN_IF_ERROR(ParsePacket(std::move(sliced), callback));
+    }
+
+    const size_t bytes_left = decoder.bytes_left();
+    if (bytes_left > 0) {
+      PERFETTO_DCHECK(partial_buf_.empty());
+      partial_buf_.insert(partial_buf_.end(), &data[decoder.read_offset()],
+                          &data[decoder.read_offset() + bytes_left]);
+    }
+    return util::OkStatus();
+  }
+
+  template <typename Callback = util::Status(TraceBlobView)>
+  util::Status ParsePacket(TraceBlobView packet, Callback callback) {
+    protos::pbzero::TracePacket::Decoder decoder(packet.data(),
+                                                 packet.length());
+    if (decoder.has_compressed_packets()) {
+      if (!gzip::IsGzipSupported()) {
+        return util::Status(
+            "Cannot decode compressed packets. Zlib not enabled");
+      }
+
+      protozero::ConstBytes field = decoder.compressed_packets();
+      const size_t field_off = packet.offset_of(field.data);
+      TraceBlobView compressed_packets = packet.slice(field_off, field.size);
+      TraceBlobView packets(nullptr, 0, 0);
+
+      RETURN_IF_ERROR(Decompress(std::move(compressed_packets), &packets));
+
+      const uint8_t* start = packets.data();
+      const uint8_t* end = packets.data() + packets.length();
+      const uint8_t* ptr = start;
+      while ((end - ptr) > 2) {
+        const uint8_t* packet_start = ptr;
+        if (PERFETTO_UNLIKELY(*ptr != kTracePacketTag))
+          return util::ErrStatus("Expected TracePacket tag");
+        uint64_t packet_size = 0;
+        ptr = protozero::proto_utils::ParseVarInt(++ptr, end, &packet_size);
+        size_t packet_offset = static_cast<size_t>(ptr - start);
+        ptr += packet_size;
+        if (PERFETTO_UNLIKELY((ptr - packet_start) < 2 || ptr > end))
+          return util::ErrStatus("Invalid packet size");
+
+        TraceBlobView sliced =
+            packets.slice(packet_offset, static_cast<size_t>(packet_size));
+        RETURN_IF_ERROR(ParsePacket(std::move(sliced), callback));
+      }
+      return util::OkStatus();
+    }
+    return callback(std::move(packet));
+  }
+
+  util::Status Decompress(TraceBlobView input, TraceBlobView* output);
 
   // Used to glue together trace packets that span across two (or more)
   // Parse() boundaries.
   std::vector<uint8_t> partial_buf_;
 
-  // Temporary. Currently trace packets do not have a timestamp, so the
-  // timestamp given is latest_timestamp_.
-  int64_t latest_timestamp_ = 0;
-
-  // Stores incremental state and references to interned data, e.g. for track
-  // event protos.
-  std::unique_ptr<ProtoIncrementalState> incremental_state;
-
   // Allows support for compressed trace packets.
   GzipDecompressor decompressor_;
 };
diff --git a/src/trace_processor/importers/proto/track_event_tokenizer.cc b/src/trace_processor/importers/proto/track_event_tokenizer.cc
index a09f4e2..ec9c8ff 100644
--- a/src/trace_processor/importers/proto/track_event_tokenizer.cc
+++ b/src/trace_processor/importers/proto/track_event_tokenizer.cc
@@ -21,7 +21,7 @@
 #include "src/trace_processor/importers/common/process_tracker.h"
 #include "src/trace_processor/importers/common/track_tracker.h"
 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
-#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
 #include "src/trace_processor/storage/stats.h"
 #include "src/trace_processor/storage/trace_storage.h"
 #include "src/trace_processor/trace_blob_view.h"
@@ -147,7 +147,7 @@
         track.uuid(), track.parent_uuid(), name_id);
   }
 
-  // Let ProtoTraceTokenizer forward the packet to the parser.
+  // Let ProtoTraceReader forward the packet to the parser.
   return ModuleResult::Ignored();
 }
 
@@ -173,7 +173,7 @@
   protos::pbzero::ThreadDescriptor::Decoder thread(packet.thread_descriptor());
   TokenizeThreadDescriptor(state, thread);
 
-  // Let ProtoTraceTokenizer forward the packet to the parser.
+  // Let ProtoTraceReader forward the packet to the parser.
   return ModuleResult::Ignored();
 }
 
diff --git a/src/trace_processor/trace_processor_storage_impl.cc b/src/trace_processor/trace_processor_storage_impl.cc
index 7379b55..9eea8df 100644
--- a/src/trace_processor/trace_processor_storage_impl.cc
+++ b/src/trace_processor/trace_processor_storage_impl.cc
@@ -30,7 +30,7 @@
 #include "src/trace_processor/importers/proto/heap_profile_tracker.h"
 #include "src/trace_processor/importers/proto/metadata_tracker.h"
 #include "src/trace_processor/importers/proto/proto_importer_module.h"
-#include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
+#include "src/trace_processor/importers/proto/proto_trace_reader.h"
 #include "src/trace_processor/importers/proto/stack_profile_tracker.h"
 #include "src/trace_processor/trace_blob_view.h"
 #include "src/trace_processor/trace_sorter.h"