Merge "Improve chrome_slice rendering perf"
diff --git a/CHANGELOG b/CHANGELOG
index 0861716..e63367e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,12 @@
       new-line when specifying a query file with -q to trace processor shell.
     * Added "ancestor_slice_by_stack" and "descendant_slice_by_stack" table
       functions to walk up and down the slice stacks.
+    * Overhauled windowed sorting to be based on packet ordering and
+      lifecycle events inside the trace instead of time-based ordering. 
+    * Deprecated |SortingMode::kForceFlushPeriodWindowedSort| due to
+      windowed sorting chagnes. Embedders should switch to
+      |SortingMode::kDefaultHeuristics|; this option will be removed
+      in v21.
   UI:
     *
   SDK:
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index 05735b9..5b459d0 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -40,11 +40,12 @@
 constexpr char kMetricProtoRoot[] = "protos/perfetto/metrics/";
 
 // Enum which encodes how trace processor should try to sort the ingested data.
+// Note that these options are only applicable to proto traces; other trace
+// types (e.g. JSON, Fuchsia) use full sorts.
 enum class SortingMode {
   // This option allows trace processor to use built-in heuristics about how to
   // sort the data. Generally, this option is correct for most embedders as
-  // trace processor reads information from the trace (e.g. TraceConfig) to make
-  // the best decision.
+  // trace processor reads information from the trace to make the best decision.
   //
   // The exact heuristics are implementation details but will ensure that all
   // relevant tables are sorted by timestamp.
@@ -58,14 +59,19 @@
   // data to be skipped.
   kForceFullSort = 1,
 
-  // This option forces trace processor to use the |flush_period_ms| specified
-  // in the TraceConfig to perform a windowed sort of the data. The window size
-  // is not guaranteed to be exactly |flush_period_ms| but will be of the same
-  // order of magnitude; the exact value is an implementation detail and should
-  // not be relied upon.
+  // This option is deprecated in v18; trace processor will ignore it and
+  // use |kDefaultHeuristics|.
   //
-  // If a |flush_period_ms| is not specified in the TraceConfig, this mode will
-  // act the same as |SortingMode::kDefaultHeuristics|.
+  // Rationale for deprecation:
+  // The new windowed sorting logic in trace processor uses a combination of
+  // flush and buffer-read lifecycle events inside the trace instead of
+  // using time-periods from the config.
+  //
+  // Recommended migration:
+  // Users of this option should switch to using |kDefaultHeuristics| which
+  // will act very similarly to the pre-v20 behaviour of this option.
+  //
+  // This option is scheduled to be removed in v21.
   kForceFlushPeriodWindowedSort = 2
 };
 
diff --git a/src/trace_processor/forwarding_trace_parser.cc b/src/trace_processor/forwarding_trace_parser.cc
index 94bcc67..dc2cf54 100644
--- a/src/trace_processor/forwarding_trace_parser.cc
+++ b/src/trace_processor/forwarding_trace_parser.cc
@@ -40,6 +40,17 @@
   return str;
 }
 
+TraceSorter::SortingMode ConvertSortingMode(SortingMode sorting_mode) {
+  switch (sorting_mode) {
+    case SortingMode::kDefaultHeuristics:
+    case SortingMode::kForceFlushPeriodWindowedSort:
+      return TraceSorter::SortingMode::kDefault;
+    case SortingMode::kForceFullSort:
+      return TraceSorter::SortingMode::kFullSort;
+  }
+  PERFETTO_FATAL("For GCC");
+}
+
 // Fuchsia traces have a magic number as documented here:
 // https://fuchsia.googlesource.com/fuchsia/+/HEAD/docs/development/tracing/trace-format/README.md#magic-number-record-trace-info-type-0
 constexpr uint64_t kFuchsiaMagicNumber = 0x0016547846040010;
@@ -55,7 +66,6 @@
                                           size_t size) {
   // If this is the first Parse() call, guess the trace type and create the
   // appropriate parser.
-  static const int64_t kMaxWindowSize = std::numeric_limits<int64_t>::max();
   if (!reader_) {
     TraceType trace_type;
     {
@@ -70,8 +80,9 @@
           reader_ = std::move(context_->json_trace_tokenizer);
 
           // JSON traces have no guarantees about the order of events in them.
-          context_->sorter.reset(new TraceSorter(
-              std::move(context_->json_trace_parser), kMaxWindowSize));
+          context_->sorter.reset(
+              new TraceSorter(context_, std::move(context_->json_trace_parser),
+                              TraceSorter::SortingMode::kFullSort));
         } else {
           return util::ErrStatus("JSON support is disabled");
         }
@@ -79,12 +90,12 @@
       }
       case kProtoTraceType: {
         PERFETTO_DLOG("Proto trace detected");
-        // This will be reduced once we read the trace config and we see flush
-        // period being set.
+        auto sorting_mode = ConvertSortingMode(context_->config.sorting_mode);
         reader_.reset(new ProtoTraceReader(context_));
         context_->sorter.reset(new TraceSorter(
+            context_,
             std::unique_ptr<TraceParser>(new ProtoTraceParser(context_)),
-            kMaxWindowSize));
+            sorting_mode));
         context_->process_tracker->SetPidZeroIgnoredForIdleProcess();
         break;
       }
@@ -101,7 +112,8 @@
 
           // Fuschia traces can have massively out of order events.
           context_->sorter.reset(new TraceSorter(
-              std::move(context_->fuchsia_trace_parser), kMaxWindowSize));
+              context_, std::move(context_->fuchsia_trace_parser),
+              TraceSorter::SortingMode::kFullSort));
         } else {
           return util::ErrStatus("Fuchsia support is disabled");
         }
diff --git a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
index 5aa9e38..7253e8a 100644
--- a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
+++ b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
@@ -93,7 +93,6 @@
     size_t off = bundle.offset_of(it->data());
     TokenizeFtraceEvent(cpu, clock_id, bundle.slice(off, it->size()), state);
   }
-  context_->sorter->FinalizeFtraceEventBatch(cpu);
   return base::OkStatus();
 }
 
diff --git a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
index c0f11cd..60c1606 100644
--- a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
@@ -252,7 +252,8 @@
     context_.flow_tracker.reset(flow_);
     clock_ = new ClockTracker(&context_);
     context_.clock_tracker.reset(clock_);
-    context_.sorter.reset(new TraceSorter(CreateParser(), 0 /*window size*/));
+    context_.sorter.reset(new TraceSorter(&context_, CreateParser(),
+                                          TraceSorter::SortingMode::kFullSort));
     context_.descriptor_pool_.reset(new DescriptorPool());
 
     RegisterDefaultModules(&context_);
@@ -333,6 +334,7 @@
               PushSchedSwitch(10, 1000, 10, base::StringView(kProc2Name), 256,
                               32, 100, base::StringView(kProc1Name), 1024));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadEventsIntoRaw) {
@@ -364,6 +366,7 @@
   EXPECT_CALL(*process_, GetOrCreateProcess(123));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& raw = context_.storage->raw_table();
   ASSERT_EQ(raw.row_count(), 2u);
@@ -414,6 +417,7 @@
   field->set_uint_value(3);
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& raw = storage_->raw_table();
 
@@ -481,6 +485,7 @@
                               32, 10, base::StringView(kProcName2), 512));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadMultiplePackets) {
@@ -526,6 +531,7 @@
               PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
                               32, 10, base::StringView(kProcName2), 512));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, RepeatedLoadSinglePacket) {
@@ -548,6 +554,7 @@
               PushSchedSwitch(10, 1000, 10, base::StringView(kProcName2), 256,
                               32, 100, base::StringView(kProcName1), 1024));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   bundle = trace_->add_packet()->set_ftrace_events();
   bundle->set_cpu(10);
@@ -567,6 +574,7 @@
               PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
                               32, 10, base::StringView(kProcName2), 512));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadCpuFreq) {
@@ -581,6 +589,7 @@
 
   EXPECT_CALL(*event_, PushCounter(1000, DoubleEq(2000), TrackId{0}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->cpu_counter_track_table().cpu()[0], 10u);
 }
@@ -598,6 +607,7 @@
   EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts),
                                    DoubleEq(value * 1024.0), TrackId{0u}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
 }
@@ -615,6 +625,7 @@
   EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts), DoubleEq(value),
                                    TrackId{0u}));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
 }
@@ -632,6 +643,7 @@
               SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
                                  base::StringView(kProcName1)));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadProcessPacket_FirstCmdline) {
@@ -649,6 +661,7 @@
               SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
                                  base::StringView("proc1 proc2")));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, LoadThreadPacket) {
@@ -659,11 +672,10 @@
 
   EXPECT_CALL(*process_, UpdateThread(1, 2));
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 }
 
 TEST_F(ProtoTraceParserTest, ProcessNameFromProcessDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -707,8 +719,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, ThreadNameFromThreadDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -763,9 +773,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedData) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -815,8 +822,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 1u;
@@ -860,9 +866,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedDataWithTypes) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -909,8 +912,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 1u;
@@ -953,9 +955,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithInternedData) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1087,8 +1086,7 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   tables::ThreadTable::Row row(16);
   row.upid = 2u;
@@ -1178,9 +1176,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventAsyncEvents) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1334,9 +1329,6 @@
 
 // TODO(eseckler): Also test instant events on separate tracks.
 TEST_F(ProtoTraceParserTest, TrackEventWithTrackDescriptors) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   // Sequence 1.
   {
     auto* packet = trace_->add_packet();
@@ -1540,9 +1532,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithResortedCounterDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   // Descriptors with timestamps after the event below. They will be tokenized
   // in the order they appear here, but then resorted before parsing to appear
   // after the events below.
@@ -1643,9 +1632,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutIncrementalStateReset) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1704,9 +1690,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithoutThreadDescriptor) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     // Event should be discarded because it specifies delta timestamps and no
     // thread descriptor was seen yet.
@@ -1746,9 +1729,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithDataLoss) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1762,7 +1742,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1815,7 +1795,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 2010.
+    event->set_timestamp_delta_us(10);  // absolute: 2010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1840,9 +1820,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventMultipleSequences) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -1856,7 +1833,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1883,7 +1860,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(2);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1005.
+    event->set_timestamp_delta_us(10);  // absolute: 1005.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1901,7 +1878,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1020.
+    event->set_timestamp_delta_us(10);  // absolute: 1020.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1911,7 +1888,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(2);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1015.
+    event->set_timestamp_delta_us(10);  // absolute: 1015.
     event->add_category_iids(1);
     auto* legacy_event = event->set_legacy_event();
     legacy_event->set_name_iid(1);
@@ -1920,10 +1897,8 @@
 
   Tokenize();
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
-  EXPECT_CALL(*process_, UpdateThread(17, 15))
-      .WillRepeatedly(Return(2));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(17, 15)).WillRepeatedly(Return(2));
 
   tables::ThreadTable::Row t1(16);
   t1.upid = 1u;
@@ -1950,8 +1925,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithDebugAnnotations) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
   MockBoundInserter inserter;
 
   {
@@ -1967,7 +1940,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* annotation1 = event->add_debug_annotations();
     annotation1->set_name_iid(1);
@@ -2019,7 +1992,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1020.
+    event->set_timestamp_delta_us(10);  // absolute: 1020.
     event->add_category_iids(1);
     auto* annotation3 = event->add_debug_annotations();
     annotation3->set_name_iid(3);
@@ -2153,9 +2126,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithTaskExecution) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2169,7 +2139,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
     auto* task_execution = event->set_task_execution();
     task_execution->set_posted_from_iid(1);
@@ -2218,9 +2188,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventWithLogMessage) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2234,7 +2201,7 @@
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
     auto* event = packet->set_track_event();
-    event->set_timestamp_delta_us(10);   // absolute: 1010.
+    event->set_timestamp_delta_us(10);  // absolute: 1010.
     event->add_category_iids(1);
 
     auto* log_message = event->set_log_message();
@@ -2294,9 +2261,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventParseLegacyEventIntoRawTable) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2398,9 +2362,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, TrackEventLegacyTimestampsWithClockSnapshot) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   clock_->AddSnapshot({{protos::pbzero::BUILTIN_CLOCK_BOOTTIME, 0},
                        {protos::pbzero::BUILTIN_CLOCK_MONOTONIC, 1000000}});
 
@@ -2442,9 +2403,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, ParseEventWithClockIdButWithoutClockSnapshot) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2471,9 +2429,6 @@
   static const char kIntName[] = "int_name";
   static const int kIntValue = 123;
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2511,9 +2466,6 @@
   static const char kStringName[] = "string_name";
   static const char kStringValue[] = "string_value";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_timestamp(1000);
@@ -2558,9 +2510,6 @@
   static const char kDataPart1[] = "bbb";
   static const char kFullData[] = "aaabbb";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2588,9 +2537,6 @@
 TEST_F(ProtoTraceParserTest, ParseChromeLegacyJsonIntoRawTable) {
   static const char kUserTraceEvent[] = "{\"user\":1}";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   {
     auto* packet = trace_->add_packet();
     packet->set_trusted_packet_sequence_id(1);
@@ -2622,9 +2568,6 @@
   static const char kTag1[] = "tag1";
   static const char kTag2[] = "tag2";
 
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   auto* metadata = trace_->add_packet()->set_chrome_benchmark_metadata();
   metadata->set_benchmark_name(kName);
   metadata->add_story_tags(kTag1);
@@ -2653,9 +2596,6 @@
 }
 
 TEST_F(ProtoTraceParserTest, LoadChromeMetadata) {
-  context_.sorter.reset(new TraceSorter(
-      CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
   auto* track_event = trace_->add_packet()->set_chrome_events();
   {
     auto* metadata = track_event->add_metadata();
@@ -2723,6 +2663,7 @@
   }
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Packet-level errors reflected in stats storage.
   const auto& stats = context_.storage->stats();
@@ -2775,6 +2716,7 @@
   }
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Packet-level errors reflected in stats storage.
   const auto& stats = context_.storage->stats();
@@ -2860,10 +2802,10 @@
     samples->set_process_priority(30);
   }
 
-  EXPECT_CALL(*process_, UpdateThread(16, 15))
-      .WillRepeatedly(Return(1));
+  EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   // Verify cpu_profile_samples.
   const auto& samples = storage_->cpu_profile_stack_sample_table();
@@ -2950,6 +2892,7 @@
   EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
 
   Tokenize();
+  context_.sorter->ExtractEventsForced();
 
   const auto& samples = storage_->cpu_profile_stack_sample_table();
   EXPECT_EQ(samples.row_count(), 1u);
@@ -2966,6 +2909,7 @@
   config->set_trace_uuid_msb(2);
 
   ASSERT_TRUE(Tokenize().ok());
+  context_.sorter->ExtractEventsForced();
 
   SqlValue value =
       context_.metadata_tracker->GetMetadataForTesting(metadata::trace_uuid);
@@ -2978,6 +2922,7 @@
   config->add_buffers()->set_size_kb(42);
 
   ASSERT_TRUE(Tokenize().ok());
+  context_.sorter->ExtractEventsForced();
 
   SqlValue value = context_.metadata_tracker->GetMetadataForTesting(
       metadata::trace_config_pbtxt);
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.cc b/src/trace_processor/importers/proto/proto_trace_reader.cc
index 4b133a8..e8ca900 100644
--- a/src/trace_processor/importers/proto/proto_trace_reader.cc
+++ b/src/trace_processor/importers/proto/proto_trace_reader.cc
@@ -233,49 +233,12 @@
 
 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
   protos::pbzero::TraceConfig::Decoder trace_config(blob);
-  // If we're forcing a full sort, we can keep using the INT_MAX duration set
-  // when we created the sorter.
-  const auto& cfg = context_->config;
-  if (cfg.sorting_mode == SortingMode::kForceFullSort) {
-    return;
-  }
-
-  base::Optional<int64_t> flush_period_window_size_ns;
-  if (trace_config.has_flush_period_ms() &&
-      trace_config.flush_period_ms() > 0) {
-    flush_period_window_size_ns =
-        static_cast<int64_t>(trace_config.flush_period_ms()) * 2 * 1000 * 1000;
-  }
-
-  // If we're trying to force flush period based sorting, use that as the
-  // window size if it exists.
-  if (cfg.sorting_mode == SortingMode::kForceFlushPeriodWindowedSort &&
-      flush_period_window_size_ns.has_value()) {
-    context_->sorter->SetWindowSizeNs(flush_period_window_size_ns.value());
-    return;
-  }
-
-  // If we end up here, we should use heuristics because either the sorting mode
-  // was set as such or we don't have a flush period to force the window size
-  // to.
-
-  // If we're not forcing anything and this is a write_into_file trace, then
-  // use flush_period_ms as an indiciator for how big the sliding window for the
-  // sorter should be.
-  if (trace_config.write_into_file()) {
-    int64_t window_size_ns;
-    if (flush_period_window_size_ns.has_value()) {
-      window_size_ns = flush_period_window_size_ns.value();
-    } else {
-      constexpr uint64_t kDefaultWindowNs =
-          180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
-      PERFETTO_ELOG(
-          "It is strongly recommended to have flush_period_ms set when "
-          "write_into_file is turned on. You will likely have many dropped "
-          "events because of inability to sort the events correctly.");
-      window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
-    }
-    context_->sorter->SetWindowSizeNs(window_size_ns);
+  if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
+    PERFETTO_ELOG(
+        "It is strongly recommended to have flush_period_ms set when "
+        "write_into_file is turned on. This trace will be loaded fully "
+        "into memory before sorting which increases the likliehoold of "
+        "OOMs.");
   }
 }
 
@@ -448,6 +411,12 @@
     context_->metadata_tracker->SetMetadata(
         metadata::all_data_source_started_ns, Variadic::Integer(ts));
   }
+  if (tse.all_data_sources_flushed()) {
+    context_->sorter->NotifyFlushEvent();
+  }
+  if (tse.read_tracing_buffers_completed()) {
+    context_->sorter->NotifyReadBufferEvent();
+  }
   return util::OkStatus();
 }
 
diff --git a/src/trace_processor/storage/stats.h b/src/trace_processor/storage/stats.h
index 4a05a74..ea71ef0 100644
--- a/src/trace_processor/storage/stats.h
+++ b/src/trace_processor/storage/stats.h
@@ -185,7 +185,11 @@
       "the tracing service. This happens if the ftrace buffers were not "      \
       "cleared properly. These packets are silently dropped by trace "         \
       "processor."),                                                           \
-  F(perf_guardrail_stop_ts,             kIndexed, kDataLoss, kTrace,    "")
+  F(perf_guardrail_stop_ts,             kIndexed, kDataLoss, kTrace,    ""),   \
+  F(sorter_push_event_out_of_order,     kSingle, kError,     kTrace,           \
+       "Trace events are out of order event after sorting. This can happen "   \
+       "due to many factors including clock sync drift, producers emitting "   \
+       "events out of order or a bug in trace processor's logic of sorting.")
 // clang-format on
 
 enum Type {
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index c1bc9e5..a3907dd 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -24,9 +24,12 @@
 namespace perfetto {
 namespace trace_processor {
 
-TraceSorter::TraceSorter(std::unique_ptr<TraceParser> parser,
-                         int64_t window_size_ns)
-    : parser_(std::move(parser)), window_size_ns_(window_size_ns) {
+TraceSorter::TraceSorter(TraceProcessorContext* context,
+                         std::unique_ptr<TraceParser> parser,
+                         SortingMode sorting_mode)
+    : context_(context),
+      parser_(std::move(parser)),
+      sorting_mode_(sorting_mode) {
   const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
   bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
   if (bypass_next_stage_for_testing_)
@@ -56,10 +59,10 @@
   PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
 }
 
-// Removes all the events in |queues_| that are earlier than the given window
-// size and moves them to the next parser stages, respecting global timestamp
-// order. This function is a "extract min from N sorted queues", with some
-// little cleverness: we know that events tend to be bursty, so events are
+// Removes all the events in |queues_| that are earlier than the given
+// packet index and moves them to the next parser stages, respecting global
+// timestamp order. This function is a "extract min from N sorted queues", with
+// some little cleverness: we know that events tend to be bursty, so events are
 // not going to be randomly distributed on the N |queues_|.
 // Upon each iteration this function finds the first two queues (if any) that
 // have the oldest events, and extracts events from the 1st until hitting the
@@ -76,12 +79,8 @@
 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
 // time in a profiler.
-void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
-  DCHECK_ftrace_batch_cpu(kNoBatch);
-
+void TraceSorter::SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx) {
   constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
-  const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
-  int64_t extract_end_ts = global_max_ts_ - window_size_ns;
   size_t iterations = 0;
   for (;; iterations++) {
     size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
@@ -122,13 +121,14 @@
     PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
 
     // Now that we identified the min-queue, extract all events from it until
-    // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
-    // whichever comes first.
-    int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
+    // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
+    // limit, whichever comes first.
     size_t num_extracted = 0;
     for (auto& event : events) {
-      if (event.timestamp > extract_until_ts)
+      if (event.packet_idx >= limit_packet_idx ||
+          event.timestamp > min_queue_ts[1]) {
         break;
+      }
 
       ++num_extracted;
       MaybePushEvent(min_queue_idx, std::move(event));
@@ -162,12 +162,6 @@
     }
   }  // for(;;)
 
-  // We decide to extract events only when we know (using the global_{min,max}
-  // bounds) that there are eligible events. We should never end up in a
-  // situation where we call this function but then realize that there was
-  // nothing to extract.
-  PERFETTO_DCHECK(iterations > 0 || was_empty);
-
 #if PERFETTO_DCHECK_IS_ON()
   // Check that the global min/max are consistent.
   int64_t dbg_min_ts = kTsMax;
@@ -182,10 +176,15 @@
 }
 
 void TraceSorter::MaybePushEvent(size_t queue_idx, TimestampedTracePiece ttp) {
+  int64_t timestamp = ttp.timestamp;
+  if (timestamp < latest_pushed_event_ts_)
+    context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
+
+  latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
+
   if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_))
     return;
 
-  int64_t timestamp = ttp.timestamp;
   if (queue_idx == 0) {
     // queues_[0] is for non-ftrace packets.
     parser_->ParseTracePacket(timestamp, std::move(ttp));
diff --git a/src/trace_processor/trace_sorter.h b/src/trace_processor/trace_sorter.h
index a93cd26..50bec23 100644
--- a/src/trace_processor/trace_sorter.h
+++ b/src/trace_processor/trace_sorter.h
@@ -39,21 +39,40 @@
 // This class takes care of sorting events parsed from the trace stream in
 // arbitrary order and pushing them to the next pipeline stages (parsing) in
 // order. In order to support streaming use-cases, sorting happens within a
-// max window. Events are held in the TraceSorter staging area (events_) until
-// either (1) the (max - min) timestamp > window_size; (2) trace EOF.
+// window.
 //
-// This class is designed around the assumption that:
+// Events are held in the TraceSorter staging area (events_) until either:
+// 1. We can determine that it's safe to extract events by observing
+//  TracingServiceEvent Flush and ReadBuffer events
+// 2. The trace EOF is reached
+//
+// Incremental extraction
+//
+// Incremental extraction happens by using a combination of flush and read
+// buffer events from the tracing service. Note that incremental extraction
+// is only applicable for write_into_file traces; ring-buffer traces will
+// be sorted fully in-memory implicitly because there is only a single read
+// buffer call at the end.
+//
+// The algorithm for incremental extraction is explained in detail at
+// go/trace-sorting-is-complicated.
+//
+// Sorting algorithm
+//
+// The sorting algorithm is designed around the assumption that:
 // - Most events come from ftrace.
 // - Ftrace events are sorted within each cpu most of the times.
 //
 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
 // necessary) before proceeding with the global merge-sort-extract.
-// When an event is pushed through, it is just appeneded to the end of one of
+//
+// When an event is pushed through, it is just appended to the end of one of
 // the N queues. While appending, we keep track of the fact that the queue
 // is still ordered or just lost ordering. When an out-of-order event is
 // detected on a queue we keep track of: (1) the offset within the queue where
 // the chaos begun, (2) the timestamp that broke the ordering.
+//
 // When we decide to extract events from the queues into the next stages of
 // the trace processor, we re-sort the events in the queue. Rather than
 // re-sorting everything all the times, we use the above knowledge to restrict
@@ -65,41 +84,43 @@
 // from there to the end.
 class TraceSorter {
  public:
-  TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns);
+  enum class SortingMode {
+    kDefault,
+    kFullSort,
+  };
+
+  TraceSorter(TraceProcessorContext* context,
+              std::unique_ptr<TraceParser> parser,
+              SortingMode);
 
   inline void PushTracePacket(int64_t timestamp,
                               PacketSequenceState* state,
                               TraceBlobView packet) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
-        TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet),
-                              state->current_generation()));
+    AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+                                               std::move(packet),
+                                               state->current_generation()));
   }
 
   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
   }
 
   inline void PushFuchsiaRecord(int64_t timestamp,
                                 std::unique_ptr<FuchsiaRecord> record) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
   }
 
   inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-
     int64_t timestamp = systrace_line->ts;
-    AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece(
-        timestamp, packet_idx_++, std::move(systrace_line)));
+    AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+                                               std::move(systrace_line)));
   }
 
   inline void PushTrackEventPacket(int64_t timestamp,
                                    std::unique_ptr<TrackEventData> data) {
-    AppendNonFtraceAndMaybeExtractEvents(
+    AppendNonFtraceEvent(
         TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
   }
 
@@ -107,65 +128,54 @@
                               int64_t timestamp,
                               TraceBlobView event,
                               PacketSequenceState* state) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(TimestampedTracePiece(
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(TimestampedTracePiece(
         timestamp, packet_idx_++,
         FtraceEventData{std::move(event), state->current_generation()}));
-
-    // The caller must call FinalizeFtraceEventBatch() after having pushed a
-    // batch of ftrace events. This is to amortize the overhead of handling
-    // global ordering and doing that in batches only after all ftrace events
-    // for a bundle are pushed.
+    UpdateGlobalTs(queue);
   }
   inline void PushInlineFtraceEvent(uint32_t cpu,
                                     int64_t timestamp,
                                     InlineSchedSwitch inline_sched_switch) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(
-        TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
-
-    // As with |PushFtraceEvent|, doesn't immediately sort the affected queues.
     // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
     // pushed through this function), the ftrace batches will no longer be fully
     // sorted by timestamp. In such situations, we will have to sort at the end
     // of the batch. We can do better as both sub-sequences are sorted however.
     // Consider adding extra queues, or pushing them in a merge-sort fashion
     // instead.
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(
+        TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
+    UpdateGlobalTs(queue);
   }
   inline void PushInlineFtraceEvent(uint32_t cpu,
                                     int64_t timestamp,
                                     InlineSchedWaking inline_sched_waking) {
-    set_ftrace_batch_cpu_for_DCHECK(cpu);
-    GetQueue(cpu + 1)->Append(
+    auto* queue = GetQueue(cpu + 1);
+    queue->Append(
         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
-  }
-  inline void FinalizeFtraceEventBatch(uint32_t cpu) {
-    DCHECK_ftrace_batch_cpu(cpu);
-    set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
-    MaybeExtractEvents(GetQueue(cpu + 1));
+    UpdateGlobalTs(queue);
   }
 
-  // Extract all events ignoring the window.
   void ExtractEventsForced() {
-    SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
+    SortAndExtractEventsUntilPacket(packet_idx_);
     queues_.resize(0);
+
+    packet_idx_for_extraction_ = packet_idx_;
+    flushes_since_extraction_ = 0;
   }
 
-  // Sets the window size to be the size specified (which should be lower than
-  // any previous window size specified) and flushes any data beyond
-  // this window size.
-  // It is undefined to call this function with a window size greater than than
-  // the current size.
-  void SetWindowSizeNs(int64_t window_size_ns) {
-    PERFETTO_DCHECK(window_size_ns <= window_size_ns_);
+  void NotifyFlushEvent() { flushes_since_extraction_++; }
 
-    PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns);
-    window_size_ns_ = window_size_ns;
-
-    // Fast path: if, globally, we are within the window size, then just exit.
-    if (global_max_ts_ - global_min_ts_ < window_size_ns)
+  void NotifyReadBufferEvent() {
+    if (sorting_mode_ == SortingMode::kFullSort ||
+        flushes_since_extraction_ < 2) {
       return;
-    SortAndExtractEventsBeyondWindow(window_size_ns_);
+    }
+
+    SortAndExtractEventsUntilPacket(packet_idx_for_extraction_);
+    packet_idx_for_extraction_ = packet_idx_;
+    flushes_since_extraction_ = 0;
   }
 
   int64_t max_timestamp() const { return global_max_ts_; }
@@ -210,9 +220,7 @@
     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
   };
 
-  // This method passes any events older than window_size_ns to the
-  // parser to be parsed and then stored.
-  void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
+  void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx);
 
   inline Queue* GetQueue(size_t index) {
     if (PERFETTO_UNLIKELY(index >= queues_.size()))
@@ -220,57 +228,40 @@
     return &queues_[index];
   }
 
-  inline void AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece ttp) {
-    // Fast path: if this event is before all other events in the sorter and
-    // happened more than the window size in the past, just push the event to
-    // the next stage. This saves all the sorting logic which would simply move
-    // this event to the head of the queue and then extract it out.
-    //
-    // In practice, these events will be rejected as being "out-of-order" later
-    // on in trace processor (i.e. in EventTracker or SliceTracker); we don't
-    // drop here to allow them to track packet drop stats.
-    //
-    // See b/188392852 for an example of where this condition would be hit in
-    // practice.
-    bool is_before_all_events = ttp.timestamp < global_max_ts_;
-    bool is_before_window = global_max_ts_ - ttp.timestamp >= window_size_ns_;
-    if (PERFETTO_UNLIKELY(is_before_all_events && is_before_window)) {
-      MaybePushEvent(0, std::move(ttp));
-      return;
-    }
-
-    // Slow path: append the event to the non-ftrace queue and extract any
-    // events if available.
+  inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) {
     Queue* queue = GetQueue(0);
     queue->Append(std::move(ttp));
-    MaybeExtractEvents(queue);
+    UpdateGlobalTs(queue);
   }
 
-  inline void MaybeExtractEvents(Queue* queue) {
-    DCHECK_ftrace_batch_cpu(kNoBatch);
-    global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
+  inline void UpdateGlobalTs(Queue* queue) {
     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
-
-    // Fast path: if, globally, we are within the window size, then just exit.
-    if (global_max_ts_ - global_min_ts_ < window_size_ns_)
-      return;
-    SortAndExtractEventsBeyondWindow(window_size_ns_);
+    global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
   }
 
   void MaybePushEvent(size_t queue_idx,
                       TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE;
 
+  TraceProcessorContext* context_;
   std::unique_ptr<TraceParser> parser_;
 
+  // Whether we should ignore incremental extraction and just wait for
+  // forced extractionn at the end of the trace.
+  SortingMode sorting_mode_ = SortingMode::kDefault;
+
+  // The packet index until which events should be extracted. Set based
+  // on the packet index in |OnReadBuffer|.
+  uint64_t packet_idx_for_extraction_ = 0;
+
+  // The number of flushes which have happened since the last incremental
+  // extraction.
+  uint32_t flushes_since_extraction_ = 0;
+
   // queues_[0] is the general (non-ftrace) queue.
   // queues_[1] is the ftrace queue for CPU(0).
   // queues_[x] is the ftrace queue for CPU(x - 1).
   std::vector<Queue> queues_;
 
-  // Events are propagated to the next stage only after (max - min) timestamp
-  // is larger than this value.
-  int64_t window_size_ns_;
-
   // max(e.timestamp for e in queues_).
   int64_t global_max_ts_ = 0;
 
@@ -283,23 +274,8 @@
   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
   bool bypass_next_stage_for_testing_ = false;
 
-#if PERFETTO_DCHECK_IS_ON()
-  // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
-  uint32_t ftrace_batch_cpu_ = kNoBatch;
-
-  inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
-    PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
-  }
-
-  inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
-    PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
-                    cpu == kNoBatch);
-    ftrace_batch_cpu_ = cpu;
-  }
-#else
-  inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
-  inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
-#endif
+  // max(e.ts for e pushed to next stage)
+  int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
 };
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/trace_sorter_unittest.cc b/src/trace_processor/trace_sorter_unittest.cc
index b49f515..18c6d1e 100644
--- a/src/trace_processor/trace_sorter_unittest.cc
+++ b/src/trace_processor/trace_sorter_unittest.cc
@@ -76,13 +76,16 @@
       : test_buffer_(std::unique_ptr<uint8_t[]>(new uint8_t[8]), 0, 8) {
     storage_ = new NiceMock<MockTraceStorage>();
     context_.storage.reset(storage_);
+    CreateSorter();
+  }
 
+  void CreateSorter(bool full_sort = true) {
     std::unique_ptr<MockTraceParser> parser(new MockTraceParser(&context_));
     parser_ = parser.get();
-
+    auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
+                                  : TraceSorter::SortingMode::kDefault;
     context_.sorter.reset(
-        new TraceSorter(std::move(parser),
-                        std::numeric_limits<int64_t>::max() /*window_size*/));
+        new TraceSorter(&context_, std::move(parser), sorting_mode));
   }
 
  protected:
@@ -98,7 +101,6 @@
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view.data(), 1));
   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
                                    std::move(view), &state);
-  context_.sorter->FinalizeFtraceEventBatch(0);
   context_.sorter->ExtractEventsForced();
 }
 
@@ -107,7 +109,6 @@
   TraceBlobView view = test_buffer_.slice(0, 1);
   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
   context_.sorter->PushTracePacket(1000, &state, std::move(view));
-  context_.sorter->FinalizeFtraceEventBatch(1000);
   context_.sorter->ExtractEventsForced();
 }
 
@@ -125,64 +126,134 @@
   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
 
-  context_.sorter->SetWindowSizeNs(200);
   context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
                                    std::move(view_4), &state);
-  context_.sorter->FinalizeFtraceEventBatch(2);
   context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
   context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
                                    std::move(view_1), &state);
-
-  context_.sorter->FinalizeFtraceEventBatch(0);
   context_.sorter->ExtractEventsForced();
 }
 
-TEST_F(TraceSorterTest, SetWindowSize) {
+TEST_F(TraceSorterTest, IncrementalExtraction) {
+  CreateSorter(false);
+
   PacketSequenceState state(&context_);
+
+  TraceBlobView view_1 = test_buffer_.slice(0, 1);
+  TraceBlobView view_2 = test_buffer_.slice(0, 2);
+  TraceBlobView view_3 = test_buffer_.slice(0, 3);
+  TraceBlobView view_4 = test_buffer_.slice(0, 4);
+  TraceBlobView view_5 = test_buffer_.slice(0, 5);
+
+  // Flush at the start of packet sequence to match behavior of the
+  // service.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+  context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+
+  // No data should be exttracted at this point because we haven't
+  // seen two flushes yet.
+  context_.sorter->NotifyReadBufferEvent();
+
+  // Now that we've seen two flushes, we should be ready to start extracting
+  // data on the next OnReadBufer call (after two flushes as usual).
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyReadBufferEvent();
+
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1400, &state, std::move(view_4));
+  context_.sorter->PushTracePacket(1300, &state, std::move(view_3));
+
+  // This ReadBuffer call should finally extract until the first OnReadBuffer
+  // call.
+  {
+    InSequence s;
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
+  }
+  context_.sorter->NotifyReadBufferEvent();
+
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1500, &state, std::move(view_5));
+
+  // Nothing should be extracted as we haven't seen the second flush.
+  context_.sorter->NotifyReadBufferEvent();
+
+  // Now we've seen the second flush we should extract the next two packets.
+  context_.sorter->NotifyFlushEvent();
+  {
+    InSequence s;
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
+  }
+  context_.sorter->NotifyReadBufferEvent();
+
+  // The forced extraction should get the last packet.
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
+  context_.sorter->ExtractEventsForced();
+}
+
+// Simulate a producer bug where the third packet is emitted
+// out of order. Verify that we track the stats correctly.
+TEST_F(TraceSorterTest, OutOfOrder) {
+  CreateSorter(false);
+
+  PacketSequenceState state(&context_);
+
   TraceBlobView view_1 = test_buffer_.slice(0, 1);
   TraceBlobView view_2 = test_buffer_.slice(0, 2);
   TraceBlobView view_3 = test_buffer_.slice(0, 3);
   TraceBlobView view_4 = test_buffer_.slice(0, 4);
 
-  MockFunction<void(std::string check_point_name)> check;
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+  context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+  context_.sorter->NotifyReadBufferEvent();
 
+  // Both of the packets should have been pushed through.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
   {
     InSequence s;
-
-    EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1));
-    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
-    EXPECT_CALL(check, Call("1"));
-    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
-    EXPECT_CALL(check, Call("2"));
-    EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+    EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
   }
+  context_.sorter->NotifyReadBufferEvent();
 
-  context_.sorter->SetWindowSizeNs(200);
-  context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
-                                   std::move(view_4), &state);
-  context_.sorter->FinalizeFtraceEventBatch(2);
-  context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
-  context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
+  // Now, pass the third packet out of order.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1150, &state, std::move(view_3));
+  context_.sorter->NotifyReadBufferEvent();
 
-  context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
-                                   std::move(view_1), &state);
-  context_.sorter->FinalizeFtraceEventBatch(0);
+  // The third packet should still be pushed through.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1150, test_buffer_.data(), 3));
+  context_.sorter->NotifyReadBufferEvent();
 
-  // At this point, we should just flush the 1000 and 1001 packets.
-  context_.sorter->SetWindowSizeNs(101);
+  // But we should also increment the stat that this was out of order.
+  ASSERT_EQ(
+      context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+      1);
 
-  // Inform the mock about where we are.
-  check.Call("1");
+  // Push the fourth packet also out of order but after third.
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->NotifyFlushEvent();
+  context_.sorter->PushTracePacket(1170, &state, std::move(view_4));
+  context_.sorter->NotifyReadBufferEvent();
 
-  // Now we should flush the 1100 packet.
-  context_.sorter->SetWindowSizeNs(99);
-
-  // Inform the mock about where we are.
-  check.Call("2");
-
-  // Now we should flush the 1200 packet.
+  // The fourt packet should still be pushed through.
+  EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1170, test_buffer_.data(), 4));
   context_.sorter->ExtractEventsForced();
+
+  // But we should also increment the stat that this was out of order.
+  ASSERT_EQ(
+      context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+      2);
 }
 
 // Simulates a random stream of ftrace events happening on random CPUs.
@@ -219,7 +290,6 @@
       expectations[ts].push_back(cpu);
       context_.sorter->PushFtraceEvent(cpu, ts, TraceBlobView(nullptr, 0, 0),
                                        &state);
-      context_.sorter->FinalizeFtraceEventBatch(cpu);
     }
   }