tp: rework windowed sorting to be packet offset based

This CL rewrites the core windowed sorting logic of trace sorter to be
based around packet offsets and flush/readbuffer events rather than time
ranges derived from the config.

This reduces the likelihood of things like suspend and drifting clocks
from messing up the sorting and causing out of order events.

Doc: go/trace-sorting-is-complicated
Change-Id: I9e3022cc459014df81746c21fc1f15ab7cc714a3
Bug: 192694346
diff --git a/CHANGELOG b/CHANGELOG
index 0861716..e63367e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,12 @@
       new-line when specifying a query file with -q to trace processor shell.
     * Added "ancestor_slice_by_stack" and "descendant_slice_by_stack" table
       functions to walk up and down the slice stacks.
+    * Overhauled windowed sorting to be based on packet ordering and
+      lifecycle events inside the trace instead of time-based ordering. 
+    * Deprecated |SortingMode::kForceFlushPeriodWindowedSort| due to
+      windowed sorting chagnes. Embedders should switch to
+      |SortingMode::kDefaultHeuristics|; this option will be removed
+      in v21.
   UI:
     *
   SDK:
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index 05735b9..5b459d0 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -40,11 +40,12 @@
 constexpr char kMetricProtoRoot[] = "protos/perfetto/metrics/";
 
 // Enum which encodes how trace processor should try to sort the ingested data.
+// Note that these options are only applicable to proto traces; other trace
+// types (e.g. JSON, Fuchsia) use full sorts.
 enum class SortingMode {
   // This option allows trace processor to use built-in heuristics about how to
   // sort the data. Generally, this option is correct for most embedders as
-  // trace processor reads information from the trace (e.g. TraceConfig) to make
-  // the best decision.
+  // trace processor reads information from the trace to make the best decision.
   //
   // The exact heuristics are implementation details but will ensure that all
   // relevant tables are sorted by timestamp.
@@ -58,14 +59,19 @@
   // data to be skipped.
   kForceFullSort = 1,
 
-  // This option forces trace processor to use the |flush_period_ms| specified
-  // in the TraceConfig to perform a windowed sort of the data. The window size
-  // is not guaranteed to be exactly |flush_period_ms| but will be of the same
-  // order of magnitude; the exact value is an implementation detail and should
-  // not be relied upon.
+  // This option is deprecated in v18; trace processor will ignore it and
+  // use |kDefaultHeuristics|.
   //
-  // If a |flush_period_ms| is not specified in the TraceConfig, this mode will
-  // act the same as |SortingMode::kDefaultHeuristics|.
+  // Rationale for deprecation:
+  // The new windowed sorting logic in trace processor uses a combination of
+  // flush and buffer-read lifecycle events inside the trace instead of
+  // using time-periods from the config.
+  //
+  // Recommended migration:
+  // Users of this option should switch to using |kDefaultHeuristics| which
+  // will act very similarly to the pre-v20 behaviour of this option.
+  //
+  // This option is scheduled to be removed in v21.
   kForceFlushPeriodWindowedSort = 2
 };
 
diff --git a/src/trace_processor/forwarding_trace_parser.cc b/src/trace_processor/forwarding_trace_parser.cc
index 94bcc67..dc2cf54 100644
--- a/src/trace_processor/forwarding_trace_parser.cc
+++ b/src/trace_processor/forwarding_trace_parser.cc
@@ -40,6 +40,17 @@
   return str;
 }
 
+TraceSorter::SortingMode ConvertSortingMode(SortingMode sorting_mode) {
+  switch (sorting_mode) {
+    case SortingMode::kDefaultHeuristics:
+    case SortingMode::kForceFlushPeriodWindowedSort:
+      return TraceSorter::SortingMode::kDefault;
+    case SortingMode::kForceFullSort:
+      return TraceSorter::SortingMode::kFullSort;
+  }
+  PERFETTO_FATAL("For GCC");
+}
+
 // Fuchsia traces have a magic number as documented here:
 // https://fuchsia.googlesource.com/fuchsia/+/HEAD/docs/development/tracing/trace-format/README.md#magic-number-record-trace-info-type-0
 constexpr uint64_t kFuchsiaMagicNumber = 0x0016547846040010;
@@ -55,7 +66,6 @@
                                           size_t size) {
   // If this is the first Parse() call, guess the trace type and create the
   // appropriate parser.
-  static const int64_t kMaxWindowSize = std::numeric_limits<int64_t>::max();
   if (!reader_) {
     TraceType trace_type;
     {
@@ -70,8 +80,9 @@
           reader_ = std::move(context_->json_trace_tokenizer);
 
           // JSON traces have no guarantees about the order of events in them.
-          context_->sorter.reset(new TraceSorter(
-              std::move(context_->json_trace_parser), kMaxWindowSize));
+          context_->sorter.reset(
+              new TraceSorter(context_, std::move(context_->json_trace_parser),
+                              TraceSorter::SortingMode::kFullSort));
         } else {
           return util::ErrStatus("JSON support is disabled");
         }
@@ -79,12 +90,12 @@
       }
       case kProtoTraceType: {
         PERFETTO_DLOG("Proto trace detected");
-        // This will be reduced once we read the trace config and we see flush
-        // period being set.
+        auto sorting_mode = ConvertSortingMode(context_->config.sorting_mode);
         reader_.reset(new ProtoTraceReader(context_));
         context_->sorter.reset(new TraceSorter(
+            context_,
             std::unique_ptr<TraceParser>(new ProtoTraceParser(context_)),
-            kMaxWindowSize));
+            sorting_mode));
         context_->process_tracker->SetPidZeroIgnoredForIdleProcess();
         break;
       }
@@ -101,7 +112,8 @@
 
           // Fuschia traces can have massively out of order events.
           context_->sorter.reset(new TraceSorter(
-              std::move(context_->fuchsia_trace_parser), kMaxWindowSize));
+              context_, std::move(context_->fuchsia_trace_parser),
+              TraceSorter::SortingMode::kFullSort));
         } else {
           return util::ErrStatus("Fuchsia support is disabled");
         }
diff --git a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
index 5aa9e38..7253e8a 100644
--- a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
+++ b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
@@ -93,7 +93,6 @@
     size_t off = bundle.offset_of(it->data());
     TokenizeFtraceEvent(cpu, clock_id, bundle.slice(off, it->size()), state);
   }
-  context_->sorter->FinalizeFtraceEventBatch(cpu);
   return base::OkStatus();
 }
 
diff --git a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
index c0f11cd..60c1606 100644
--- a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
@@ -252,7 +252,8 @@
     context_.flow_tracker.reset(flow_);
     clock_ = new ClockTracker(&context_);
     context_.clock_tracker.reset(clock_);
-    context_.sorter.reset(new TraceSorter(CreateParser(), 0 /*window size*/));
+    context_.sorter.reset(new TraceSorter(&context_, CreateParser(),
+                                          TraceSorter::SortingMode::kFullSort));
     context_.descriptor_pool_.reset(new DescriptorPool());
 
     RegisterDefaultModules(&context_);
@@ -333,6 +334,7 @@
               PushSchedSwitch(10, 1000, 10, base::StringView(kProc2Name), 256,
                               32, 100, base::StringView(kProc1Name), 1024));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadEventsIntoRaw) {
@@ -364,6 +366,7 @@
   EXPECT_CALL(*process_, GetOrCreateProcess(123));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& raw = context_.storage->raw_table();
   ASSERT_EQ(raw.row_count(), 2u);
@@ -414,6 +417,7 @@
   field->set_uint_value(3);
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& raw = storage_->raw_table();
 
@@ -481,6 +485,7 @@
                               32, 10, base::StringView(kProcName2), 512));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadMultiplePackets) {
@@ -526,6 +531,7 @@
               PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
                               32, 10, base::StringView(kProcName2), 512));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, RepeatedLoadSinglePacket) {
@@ -548,6 +554,7 @@
               PushSchedSwitch(10, 1000, 10, base::StringView(kProcName2), 256,
                               32, 100, base::StringView(kProcName1), 1024));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   bundle = trace_->add_packet()->set_ftrace_events();
   bundle->set_cpu(10);
@@ -567,6 +574,7 @@
               PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
                               32, 10, base::StringView(kProcName2), 512));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadCpuFreq) {
@@ -581,6 +589,7 @@
 
   EXPECT_CALL(*event_, PushCounter(1000, DoubleEq(2000), TrackId{0}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->cpu_counter_track_table().cpu()[0], 10u);
 }
@@ -598,6 +607,7 @@
   EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts),
                                    DoubleEq(value * 1024.0), TrackId{0u}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
 }
@@ -615,6 +625,7 @@
   EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts), DoubleEq(value),
                                    TrackId{0u}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
 }
@@ -632,6 +643,7 @@
               SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
                                  base::StringView(kProcName1)));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadProcessPacket_FirstCmdline) {
@@ -649,6 +661,7 @@
               SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
                                  base::StringView("proc1 proc2")));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadThreadPacket) {
@@ -659,11 +672,10 @@
 
   EXPECT_CALL(*process_, UpdateThread(1, 2));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, ProcessNameFromProcessDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -707,8 +719,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, ThreadNameFromThreadDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -763,9 +773,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedData) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -815,8 +822,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 1u;
@@ -860,9 +866,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedDataWithTypes) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -909,8 +912,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 1u;
@@ -953,9 +955,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithInternedData) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1087,8 +1086,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 2u;
@@ -1178,9 +1176,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventAsyncEvents) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1334,9 +1329,6 @@
 
 // TODO(eseckler): Also test instant events on separate tracks.
 TEST_F(ProtoTraceParserTest, TrackEventWithTrackDescriptors) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   // Sequence 1.
   {
     auto* packet = trace_->add_packet();
@@ -1540,9 +1532,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithResortedCounterDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   // Descriptors with timestamps after the event below. They will be tokenized
   // in the order they appear here, but then resorted before parsing to appear
   // after the events below.
@@ -1643,9 +1632,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutIncrementalStateReset) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1704,9 +1690,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutThreadDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     // Event should be discarded because it specifies delta timestamps and no
     // thread descriptor was seen yet.
@@ -1746,9 +1729,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithDataLoss) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1762,7 +1742,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1815,7 +1795,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 2010.
+    event->set_timestamp_delta_us(10);  // absolute: 2010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1840,9 +1820,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventMultipleSequences) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1856,7 +1833,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1883,7 +1860,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(2);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1005.
+    event->set_timestamp_delta_us(10);  // absolute: 1005.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1901,7 +1878,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1020.
+    event->set_timestamp_delta_us(10);  // absolute: 1020.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1911,7 +1888,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(2);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1015.
+    event->set_timestamp_delta_us(10);  // absolute: 1015.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1920,10 +1897,8 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
-  EXPECT_CALL(*process_, UpdateThread(17, 15))
-      .WillRepeatedly(Return(2));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(17, 15)).WillRepeatedly(Return(2));
 
   tables::ThreadTable::Row t1(16);
   t1.upid = 1u;
@@ -1950,8 +1925,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithDebugAnnotations) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   MockBoundInserter inserter;
 
   {
@@ -1967,7 +1940,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* annotation1 = event->add_debug_annotations();
     annotation1->set_name_iid(1);
@@ -2019,7 +1992,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1020.
+    event->set_timestamp_delta_us(10);  // absolute: 1020.
     event->add_category_iids(1);
     auto* annotation3 = event->add_debug_annotations();
     annotation3->set_name_iid(3);
@@ -2153,9 +2126,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithTaskExecution) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2169,7 +2139,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* task_execution = event->set_task_execution();
     task_execution->set_posted_from_iid(1);
@@ -2218,9 +2188,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithLogMessage) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2234,7 +2201,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
 
     auto* log_message = event->set_log_message();
@@ -2294,9 +2261,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventParseLegacyEventIntoRawTable) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2398,9 +2362,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventLegacyTimestampsWithClockSnapshot) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   clock_->AddSnapshot({{protos::pbzero::BUILTIN_CLOCK_BOOTTIME, 0},
                        {protos::pbzero::BUILTIN_CLOCK_MONOTONIC, 1000000}});
 
@@ -2442,9 +2403,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, ParseEventWithClockIdButWithoutClockSnapshot) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2471,9 +2429,6 @@
   static const char kIntName[] = "int_name";
   static const int kIntValue = 123;
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2511,9 +2466,6 @@
   static const char kStringName[] = "string_name";
   static const char kStringValue[] = "string_value";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2558,9 +2510,6 @@
   static const char kDataPart1[] = "bbb";
   static const char kFullData[] = "aaabbb";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2588,9 +2537,6 @@
 TEST_F(ProtoTraceParserTest, ParseChromeLegacyJsonIntoRawTable) {
   static const char kUserTraceEvent[] = "{\"user\":1}";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2622,9 +2568,6 @@
   static const char kTag1[] = "tag1";
   static const char kTag2[] = "tag2";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   auto* metadata = trace_->add_packet()->set_chrome_benchmark_metadata();
   metadata->set_benchmark_name(kName);
   metadata->add_story_tags(kTag1);
@@ -2653,9 +2596,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, LoadChromeMetadata) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   auto* track_event = trace_->add_packet()->set_chrome_events();
   {
     auto* metadata = track_event->add_metadata();
@@ -2723,6 +2663,7 @@
   }
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Packet-level errors reflected in stats storage.
   const auto& stats = context_.storage->stats();
@@ -2775,6 +2716,7 @@
   }
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Packet-level errors reflected in stats storage.
   const auto& stats = context_.storage->stats();
@@ -2860,10 +2802,10 @@
     samples->set_process_priority(30);
   }
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Verify cpu_profile_samples.
   const auto& samples = storage_->cpu_profile_stack_sample_table();
@@ -2950,6 +2892,7 @@
   EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& samples = storage_->cpu_profile_stack_sample_table();
   EXPECT_EQ(samples.row_count(), 1u);
@@ -2966,6 +2909,7 @@
   config->set_trace_uuid_msb(2);
 
   ASSERT_TRUE(Tokenize().ok());
+  context_.sorter->ExtractEventsForced();
 
   SqlValue value =
       context_.metadata_tracker->GetMetadataForTesting(metadata::trace_uuid);
@@ -2978,6 +2922,7 @@
   config->add_buffers()->set_size_kb(42);
 
   ASSERT_TRUE(Tokenize().ok());
+  context_.sorter->ExtractEventsForced();
 
   SqlValue value = context_.metadata_tracker->GetMetadataForTesting(
       metadata::trace_config_pbtxt);
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.cc b/src/trace_processor/importers/proto/proto_trace_reader.cc
index 4b133a8..e8ca900 100644
--- a/src/trace_processor/importers/proto/proto_trace_reader.cc
+++ b/src/trace_processor/importers/proto/proto_trace_reader.cc
@@ -233,49 +233,12 @@
 
 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
   protos::pbzero::TraceConfig::Decoder trace_config(blob);
-  // If we're forcing a full sort, we can keep using the INT_MAX duration set
-  // when we created the sorter.
-  const auto& cfg = context_->config;
-  if (cfg.sorting_mode == SortingMode::kForceFullSort) {
-    return;
-  }
-
-  base::Optional<int64_t> flush_period_window_size_ns;
-  if (trace_config.has_flush_period_ms() &&
-      trace_config.flush_period_ms() > 0) {
-    flush_period_window_size_ns =
-        static_cast<int64_t>(trace_config.flush_period_ms()) * 2 * 1000 * 1000;
-  }
-
-  // If we're trying to force flush period based sorting, use that as the
-  // window size if it exists.
-  if (cfg.sorting_mode == SortingMode::kForceFlushPeriodWindowedSort &&
-      flush_period_window_size_ns.has_value()) {
-    context_->sorter->SetWindowSizeNs(flush_period_window_size_ns.value());
-    return;
-  }
-
-  // If we end up here, we should use heuristics because either the sorting mode
-  // was set as such or we don't have a flush period to force the window size
-  // to.
-
-  // If we're not forcing anything and this is a write_into_file trace, then
-  // use flush_period_ms as an indiciator for how big the sliding window for the
-  // sorter should be.
-  if (trace_config.write_into_file()) {
-    int64_t window_size_ns;
-    if (flush_period_window_size_ns.has_value()) {
-      window_size_ns = flush_period_window_size_ns.value();
-    } else {
-      constexpr uint64_t kDefaultWindowNs =
-          180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
-      PERFETTO_ELOG(
-          "It is strongly recommended to have flush_period_ms set when "
-          "write_into_file is turned on. You will likely have many dropped "
-          "events because of inability to sort the events correctly.");
-      window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
-    }
-    context_->sorter->SetWindowSizeNs(window_size_ns);
+  if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
+    PERFETTO_ELOG(
+        "It is strongly recommended to have flush_period_ms set when "
+        "write_into_file is turned on. This trace will be loaded fully "
+        "into memory before sorting which increases the likliehoold of "
+        "OOMs.");
   }
 }
 
@@ -448,6 +411,12 @@
     context_->metadata_tracker->SetMetadata(
         metadata::all_data_source_started_ns, Variadic::Integer(ts));
   }
+  if (tse.all_data_sources_flushed()) {
+    context_->sorter->NotifyFlushEvent();
+  }
+  if (tse.read_tracing_buffers_completed()) {
+    context_->sorter->NotifyReadBufferEvent();
+  }
   return util::OkStatus();
 }
 
diff --git a/src/trace_processor/storage/stats.h b/src/trace_processor/storage/stats.h
index 4a05a74..ea71ef0 100644
--- a/src/trace_processor/storage/stats.h
+++ b/src/trace_processor/storage/stats.h
@@ -185,7 +185,11 @@
       "the tracing service. This happens if the ftrace buffers were not "      \
       "cleared properly. These packets are silently dropped by trace "         \
       "processor."),                                                           \
-  F(perf_guardrail_stop_ts,             kIndexed, kDataLoss, kTrace,    "")
+  F(perf_guardrail_stop_ts,             kIndexed, kDataLoss, kTrace,    ""),   \
+  F(sorter_push_event_out_of_order,     kSingle, kError,     kTrace,           \
+       "Trace events are out of order event after sorting. This can happen "   \
+       "due to many factors including clock sync drift, producers emitting "   \
+       "events out of order or a bug in trace processor's logic of sorting.")
 // clang-format on
 
 enum Type {
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index c1bc9e5..a3907dd 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -24,9 +24,12 @@
 namespace perfetto {
 namespace trace_processor {
 
-TraceSorter::TraceSorter(std::unique_ptr<TraceParser> parser,
-                         int64_t window_size_ns)
-    : parser_(std::move(parser)), window_size_ns_(window_size_ns) {
+TraceSorter::TraceSorter(TraceProcessorContext* context,
+                         std::unique_ptr<TraceParser> parser,
+                         SortingMode sorting_mode)
+    : context_(context),
+      parser_(std::move(parser)),
+      sorting_mode_(sorting_mode) {
   const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
   bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
   if (bypass_next_stage_for_testing_)
@@ -56,10 +59,10 @@
   PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
 }
 
-// Removes all the events in |queues_| that are earlier than the given window
-// size and moves them to the next parser stages, respecting global timestamp
-// order. This function is a "extract min from N sorted queues", with some
-// little cleverness: we know that events tend to be bursty, so events are
+// Removes all the events in |queues_| that are earlier than the given
+// packet index and moves them to the next parser stages, respecting global
+// timestamp order. This function is a "extract min from N sorted queues", with
+// some little cleverness: we know that events tend to be bursty, so events are
 // not going to be randomly distributed on the N |queues_|.
 // Upon each iteration this function finds the first two queues (if any) that
 // have the oldest events, and extracts events from the 1st until hitting the
@@ -76,12 +79,8 @@
 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
 // time in a profiler.
-void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
-  DCHECK_ftrace_batch_cpu(kNoBatch);
-
+void TraceSorter::SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx) {
   constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
-  const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
-  int64_t extract_end_ts = global_max_ts_ - window_size_ns;
   size_t iterations = 0;
   for (;; iterations++) {
     size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
@@ -122,13 +121,14 @@
     PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
 
     // Now that we identified the min-queue, extract all events from it until
-    // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
-    // whichever comes first.
-    int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
+    // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
+    // limit, whichever comes first.
     size_t num_extracted = 0;
     for (auto& event : events) {
-      if (event.timestamp > extract_until_ts)
+      if (event.packet_idx >= limit_packet_idx ||
+          event.timestamp > min_queue_ts[1]) {
         break;
+      }
 
       ++num_extracted;
       MaybePushEvent(min_queue_idx, std::move(event));
@@ -162,12 +162,6 @@
     }
   }  // for(;;)
 
-  // We decide to extract events only when we know (using the global_{min,max}
-  // bounds) that there are eligible events. We should never end up in a
-  // situation where we call this function but then realize that there was
-  // nothing to extract.
-  PERFETTO_DCHECK(iterations > 0 || was_empty);
-
 #if PERFETTO_DCHECK_IS_ON()
   // Check that the global min/max are consistent.
   int64_t dbg_min_ts = kTsMax;
@@ -182,10 +176,15 @@
 }
 
 void TraceSorter::MaybePushEvent(size_t queue_idx, TimestampedTracePiece ttp) {
+  int64_t timestamp = ttp.timestamp;
+  if (timestamp < latest_pushed_event_ts_)
+    context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
+
+  latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
+
   if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_))
     return;
 
-  int64_t timestamp = ttp.timestamp;
   if (queue_idx == 0) {
     // queues_[0] is for non-ftrace packets.
     parser_->ParseTracePacket(timestamp, std::move(ttp));
diff --git a/src/trace_processor/trace_sorter.h b/src/trace_processor/trace_sorter.h
index a93cd26..50bec23 100644
--- a/src/trace_processor/trace_sorter.h
+++ b/src/trace_processor/trace_sorter.h
@@ -39,21 +39,40 @@
 // This class takes care of sorting events parsed from the trace stream in
 // arbitrary order and pushing them to the next pipeline stages (parsing) in
 // order. In order to support streaming use-cases, sorting happens within a
-// max window. Events are held in the TraceSorter staging area (events_) until
-// either (1) the (max - min) timestamp > window_size; (2) trace EOF.
+// window.
 //
-// This class is designed around the assumption that:
+// Events are held in the TraceSorter staging area (events_) until either:
+// 1. We can determine that it's safe to extract events by observing
+//  TracingServiceEvent Flush and ReadBuffer events
+// 2. The trace EOF is reached
+//
+// Incremental extraction
+//
+// Incremental extraction happens by using a combination of flush and read
+// buffer events from the tracing service. Note that incremental extraction
+// is only applicable for write_into_file traces; ring-buffer traces will
+// be sorted fully in-memory implicitly because there is only a single read
+// buffer call at the end.
+//
+// The algorithm for incremental extraction is explained in detail at
+// go/trace-sorting-is-complicated.
+//
+// Sorting algorithm
+//
+// The sorting algorithm is designed around the assumption that:
 // - Most events come from ftrace.
 // - Ftrace events are sorted within each cpu most of the times.
 //
 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
 // necessary) before proceeding with the global merge-sort-extract.
-// When an event is pushed through, it is just appeneded to the end of one of
+//
+// When an event is pushed through, it is just appended to the end of one of
 // the N queues. While appending, we keep track of the fact that the queue
 // is still ordered or just lost ordering. When an out-of-order event is
 // detected on a queue we keep track of: (1) the offset within the queue where
 // the chaos begun, (2) the timestamp that broke the ordering.
+//
 // When we decide to extract events from the queues into the next stages of
 // the trace processor, we re-sort the events in the queue. Rather than
 // re-sorting everything all the times, we use the above knowledge to restrict
@@ -65,41 +84,43 @@
 // from there to the end.
 class TraceSorter {
  public:
-  TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns);
+  enum class SortingMode {
+    kDefault,
+    kFullSort,
+  };
+
+  TraceSorter(TraceProcessorContext* context,
+              std::unique_ptr<TraceParser> parser,
+              SortingMode);
 
   inline void PushTracePacket(int64_t timestamp,
                               PacketSequenceState* state,
                               TraceBlobView packet) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
-        TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet),
-                              state->current_generation()));
+    AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+                                               std::move(packet),
+                                               state->current_generation()));
   }
 
   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
   }
 
   inline void PushFuchsiaRecord(int64_t timestamp,
                                 std::unique_ptr<FuchsiaRecord> record) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
   }
 
   inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-
     int64_t timestamp = systrace_line->ts;
-    AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece(
-        timestamp, packet_idx_++, std::move(systrace_line)));
+    AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+                                               std::move(systrace_line)));
   }
 
   inline void PushTrackEventPacket(int64_t timestamp,
                                    std::unique_ptr<TrackEventData> data) {
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
   }
 
@@ -107,65 +128,54 @@
                               int64_t timestamp,
                               TraceBlobView event,
                               PacketSequenceState* state) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(TimestampedTracePiece(
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(TimestampedTracePiece(
         timestamp, packet_idx_++,
         FtraceEventData{std::move(event), state->current_generation()}));
-
-    // The caller must call FinalizeFtraceEventBatch() after having pushed a
-    // batch of ftrace events. This is to amortize the overhead of handling
-    // global ordering and doing that in batches only after all ftrace events
-    // for a bundle are pushed.
+    UpdateGlobalTs(queue);
   }
   inline void PushInlineFtraceEvent(uint32_t cpu,
                                     int64_t timestamp,
                                     InlineSchedSwitch inline_sched_switch) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(
-        TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
-
-    // As with |PushFtraceEvent|, doesn't immediately sort the affected queues.
     // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
     // pushed through this function), the ftrace batches will no longer be fully
     // sorted by timestamp. In such situations, we will have to sort at the end
     // of the batch. We can do better as both sub-sequences are sorted however.
     // Consider adding extra queues, or pushing them in a merge-sort fashion
     // instead.
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(
+        TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
+    UpdateGlobalTs(queue);
   }
   inline void PushInlineFtraceEvent(uint32_t cpu,
                                     int64_t timestamp,
                                     InlineSchedWaking inline_sched_waking) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(
         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
-  }
-  inline void FinalizeFtraceEventBatch(uint32_t cpu) {
-    DCHECK_ftrace_batch_cpu(cpu);
-    set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
-    MaybeExtractEvents(GetQueue(cpu + 1));
+    UpdateGlobalTs(queue);
   }
 
-  // Extract all events ignoring the window.
   void ExtractEventsForced() {
-    SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
+    SortAndExtractEventsUntilPacket(packet_idx_);
     queues_.resize(0);
+
+    packet_idx_for_extraction_ = packet_idx_;
+    flushes_since_extraction_ = 0;
   }
 
-  // Sets the window size to be the size specified (which should be lower than
-  // any previous window size specified) and flushes any data beyond
-  // this window size.
-  // It is undefined to call this function with a window size greater than than
-  // the current size.
-  void SetWindowSizeNs(int64_t window_size_ns) {
-    PERFETTO_DCHECK(window_size_ns <= window_size_ns_);
+  void NotifyFlushEvent() { flushes_since_extraction_++; }
 
-    PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns);
-    window_size_ns_ = window_size_ns;
-
-    // Fast path: if, globally, we are within the window size, then just exit.
-    if (global_max_ts_ - global_min_ts_ < window_size_ns)
+  void NotifyReadBufferEvent() {
+    if (sorting_mode_ == SortingMode::kFullSort ||
+        flushes_since_extraction_ < 2) {
       return;
-    SortAndExtractEventsBeyondWindow(window_size_ns_);
+    }
+
+    SortAndExtractEventsUntilPacket(packet_idx_for_extraction_);
+    packet_idx_for_extraction_ = packet_idx_;
+    flushes_since_extraction_ = 0;
   }
 
   int64_t max_timestamp() const { return global_max_ts_; }
@@ -210,9 +220,7 @@
     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
   };
 
-  // This method passes any events older than window_size_ns to the
-  // parser to be parsed and then stored.
-  void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
+  void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx);
 
   inline Queue* GetQueue(size_t index) {
     if (PERFETTO_UNLIKELY(index >= queues_.size()))
@@ -220,57 +228,40 @@
     return &queues_[index];
   }
 
-  inline void AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece ttp) {
-    // Fast path: if this event is before all other events in the sorter and
-    // happened more than the window size in the past, just push the event to
-    // the next stage. This saves all the sorting logic which would simply move
-    // this event to the head of the queue and then extract it out.
-    //
-    // In practice, these events will be rejected as being "out-of-order" later
-    // on in trace processor (i.e. in EventTracker or SliceTracker); we don't
-    // drop here to allow them to track packet drop stats.
-    //
-    // See b/188392852 for an example of where this condition would be hit in
-    // practice.
-    bool is_before_all_events = ttp.timestamp < global_max_ts_;
-    bool is_before_window = global_max_ts_ - ttp.timestamp >= window_size_ns_;
-    if (PERFETTO_UNLIKELY(is_before_all_events && is_before_window)) {
-      MaybePushEvent(0, std::move(ttp));
-      return;
-    }
-
-    // Slow path: append the event to the non-ftrace queue and extract any
-    // events if available.
+  inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) {
     Queue* queue = GetQueue(0);
     queue->Append(std::move(ttp));
-    MaybeExtractEvents(queue);
+    UpdateGlobalTs(queue);
   }
 
-  inline void MaybeExtractEvents(Queue* queue) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
+  inline void UpdateGlobalTs(Queue* queue) {
     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
-
-    // Fast path: if, globally, we are within the window size, then just exit.
-    if (global_max_ts_ - global_min_ts_ < window_size_ns_)
-      return;
-    SortAndExtractEventsBeyondWindow(window_size_ns_);
+    global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
   }
 
   void MaybePushEvent(size_t queue_idx,
                       TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE;
 
+  TraceProcessorContext* context_;
   std::unique_ptr<TraceParser> parser_;
 
+  // Whether we should ignore incremental extraction and just wait for
+  // forced extractionn at the end of the trace.
+  SortingMode sorting_mode_ = SortingMode::kDefault;
+
+  // The packet index until which events should be extracted. Set based
+  // on the packet index in |OnReadBuffer|.
+  uint64_t packet_idx_for_extraction_ = 0;
+
+  // The number of flushes which have happened since the last incremental
+  // extraction.
+  uint32_t flushes_since_extraction_ = 0;
+
   // queues_[0] is the general (non-ftrace) queue.
   // queues_[1] is the ftrace queue for CPU(0).
   // queues_[x] is the ftrace queue for CPU(x - 1).
   std::vector<Queue> queues_;
 
-  // Events are propagated to the next stage only after (max - min) timestamp
-  // is larger than this value.
-  int64_t window_size_ns_;
-
   // max(e.timestamp for e in queues_).
   int64_t global_max_ts_ = 0;
 
@@ -283,23 +274,8 @@
   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
   bool bypass_next_stage_for_testing_ = false;
 
-#if PERFETTO_DCHECK_IS_ON()
-  // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
-  uint32_t ftrace_batch_cpu_ = kNoBatch;
-
-  inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
-    PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
-  }
-
-  inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
-    PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
-                    cpu == kNoBatch);
-    ftrace_batch_cpu_ = cpu;
-  }
-#else
-  inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
-  inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
-#endif
+  // max(e.ts for e pushed to next stage)
+  int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/trace_sorter_unittest.cc b/src/trace_processor/trace_sorter_unittest.cc
index b49f515..18c6d1e 100644
--- a/src/trace_processor/trace_sorter_unittest.cc
+++ b/src/trace_processor/trace_sorter_unittest.cc
@@ -76,13 +76,16 @@
       : test_buffer_(std::unique_ptr<uint8_t[]>(new uint8_t[8]), 0, 8) {
     storage_ = new NiceMock<MockTraceStorage>();
     context_.storage.reset(storage_);
+    CreateSorter();
+  }
 
+  void CreateSorter(bool full_sort = true) {
     std::unique_ptr<MockTraceParser> parser(new MockTraceParser(&context_));
     parser_ = parser.get();
-
+    auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
+                                  : TraceSorter::SortingMode::kDefault;
     context_.sorter.reset(
-        new TraceSorter(std::move(parser),
-                        std::numeric_limits<int64_t>::max() /*window_size*/));
+        new TraceSorter(&context_, std::move(parser), sorting_mode));
   }
 
  protected:
@@ -98,7 +101,6 @@
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view.data(), 1));
   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
                                    std::move(view), &state);
-  context_.sorter->FinalizeFtraceEventBatch(0);
   context_.sorter->ExtractEventsForced();
 }
 
@@ -107,7 +109,6 @@
   TraceBlobView view = test_buffer_.slice(0, 1);
   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
   context_.sorter->PushTracePacket(1000, &state, std::move(view));
-  context_.sorter->FinalizeFtraceEventBatch(1000);
   context_.sorter->ExtractEventsForced();
 }
 
@@ -125,64 +126,134 @@
   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
 
-  context_.sorter->SetWindowSizeNs(200);
   context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
                                    std::move(view_4), &state);
-  context_.sorter->FinalizeFtraceEventBatch(2);
   context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
   context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
                                    std::move(view_1), &state);
-
-  context_.sorter->FinalizeFtraceEventBatch(0);
   context_.sorter->ExtractEventsForced();
 }
 
-TEST_F(TraceSorterTest, SetWindowSize) {
+TEST_F(TraceSorterTest, IncrementalExtraction) {
+  CreateSorter(false);
+
   PacketSequenceState state(&context_);
+
+  TraceBlobView view_1 = test_buffer_.slice(0, 1);
+  TraceBlobView view_2 = test_buffer_.slice(0, 2);
+  TraceBlobView view_3 = test_buffer_.slice(0, 3);
+  TraceBlobView view_4 = test_buffer_.slice(0, 4);
+  TraceBlobView view_5 = test_buffer_.slice(0, 5);
+
+  // Flush at the start of packet sequence to match behavior of the
+  // service.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+  context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+
+  // No data should be exttracted at this point because we haven't
+  // seen two flushes yet.
+  context_.sorter->NotifyReadBufferEvent();
+
+  // Now that we've seen two flushes, we should be ready to start extracting
+  // data on the next OnReadBufer call (after two flushes as usual).
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyReadBufferEvent();
+
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1400, &state, std::move(view_4));
+  context_.sorter->PushTracePacket(1300, &state, std::move(view_3));
+
+  // This ReadBuffer call should finally extract until the first OnReadBuffer
+  // call.
+  {
+    InSequence s;
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
+  }
+  context_.sorter->NotifyReadBufferEvent();
+
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1500, &state, std::move(view_5));
+
+  // Nothing should be extracted as we haven't seen the second flush.
+  context_.sorter->NotifyReadBufferEvent();
+
+  // Now we've seen the second flush we should extract the next two packets.
+  context_.sorter->NotifyFlushEvent();
+  {
+    InSequence s;
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
+  }
+  context_.sorter->NotifyReadBufferEvent();
+
+  // The forced extraction should get the last packet.
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
+  context_.sorter->ExtractEventsForced();
+}
+
+// Simulate a producer bug where the third packet is emitted
+// out of order. Verify that we track the stats correctly.
+TEST_F(TraceSorterTest, OutOfOrder) {
+  CreateSorter(false);
+
+  PacketSequenceState state(&context_);
+
   TraceBlobView view_1 = test_buffer_.slice(0, 1);
   TraceBlobView view_2 = test_buffer_.slice(0, 2);
   TraceBlobView view_3 = test_buffer_.slice(0, 3);
   TraceBlobView view_4 = test_buffer_.slice(0, 4);
 
-  MockFunction<void(std::string check_point_name)> check;
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+  context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+  context_.sorter->NotifyReadBufferEvent();
 
+  // Both of the packets should have been pushed through.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
   {
     InSequence s;
-
-    EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1));
-    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
-    EXPECT_CALL(check, Call("1"));
-    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
-    EXPECT_CALL(check, Call("2"));
-    EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
   }
+  context_.sorter->NotifyReadBufferEvent();
 
-  context_.sorter->SetWindowSizeNs(200);
-  context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
-                                   std::move(view_4), &state);
-  context_.sorter->FinalizeFtraceEventBatch(2);
-  context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
-  context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
+  // Now, pass the third packet out of order.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1150, &state, std::move(view_3));
+  context_.sorter->NotifyReadBufferEvent();
 
-  context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
-                                   std::move(view_1), &state);
-  context_.sorter->FinalizeFtraceEventBatch(0);
+  // The third packet should still be pushed through.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1150, test_buffer_.data(), 3));
+  context_.sorter->NotifyReadBufferEvent();
 
-  // At this point, we should just flush the 1000 and 1001 packets.
-  context_.sorter->SetWindowSizeNs(101);
+  // But we should also increment the stat that this was out of order.
+  ASSERT_EQ(
+      context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+      1);
 
-  // Inform the mock about where we are.
-  check.Call("1");
+  // Push the fourth packet also out of order but after third.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1170, &state, std::move(view_4));
+  context_.sorter->NotifyReadBufferEvent();
 
-  // Now we should flush the 1100 packet.
-  context_.sorter->SetWindowSizeNs(99);
-
-  // Inform the mock about where we are.
-  check.Call("2");
-
-  // Now we should flush the 1200 packet.
+  // The fourt packet should still be pushed through.
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1170, test_buffer_.data(), 4));
   context_.sorter->ExtractEventsForced();
+
+  // But we should also increment the stat that this was out of order.
+  ASSERT_EQ(
+      context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+      2);
 }
 
 // Simulates a random stream of ftrace events happening on random CPUs.
@@ -219,7 +290,6 @@
       expectations[ts].push_back(cpu);
       context_.sorter->PushFtraceEvent(cpu, ts, TraceBlobView(nullptr, 0, 0),
                                        &state);
-      context_.sorter->FinalizeFtraceEventBatch(cpu);
     }
   }