Merge "Improve chrome_slice rendering perf"
diff --git a/CHANGELOG b/CHANGELOG
index 0861716..e63367e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,12 @@
new-line when specifying a query file with -q to trace processor shell.
* Added "ancestor_slice_by_stack" and "descendant_slice_by_stack" table
functions to walk up and down the slice stacks.
+ * Overhauled windowed sorting to be based on packet ordering and
+ lifecycle events inside the trace instead of time-based ordering.
+ * Deprecated |SortingMode::kForceFlushPeriodWindowedSort| due to
+ windowed sorting chagnes. Embedders should switch to
+ |SortingMode::kDefaultHeuristics|; this option will be removed
+ in v21.
UI:
*
SDK:
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index 05735b9..5b459d0 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -40,11 +40,12 @@
constexpr char kMetricProtoRoot[] = "protos/perfetto/metrics/";
// Enum which encodes how trace processor should try to sort the ingested data.
+// Note that these options are only applicable to proto traces; other trace
+// types (e.g. JSON, Fuchsia) use full sorts.
enum class SortingMode {
// This option allows trace processor to use built-in heuristics about how to
// sort the data. Generally, this option is correct for most embedders as
- // trace processor reads information from the trace (e.g. TraceConfig) to make
- // the best decision.
+ // trace processor reads information from the trace to make the best decision.
//
// The exact heuristics are implementation details but will ensure that all
// relevant tables are sorted by timestamp.
@@ -58,14 +59,19 @@
// data to be skipped.
kForceFullSort = 1,
- // This option forces trace processor to use the |flush_period_ms| specified
- // in the TraceConfig to perform a windowed sort of the data. The window size
- // is not guaranteed to be exactly |flush_period_ms| but will be of the same
- // order of magnitude; the exact value is an implementation detail and should
- // not be relied upon.
+ // This option is deprecated in v18; trace processor will ignore it and
+ // use |kDefaultHeuristics|.
//
- // If a |flush_period_ms| is not specified in the TraceConfig, this mode will
- // act the same as |SortingMode::kDefaultHeuristics|.
+ // Rationale for deprecation:
+ // The new windowed sorting logic in trace processor uses a combination of
+ // flush and buffer-read lifecycle events inside the trace instead of
+ // using time-periods from the config.
+ //
+ // Recommended migration:
+ // Users of this option should switch to using |kDefaultHeuristics| which
+ // will act very similarly to the pre-v20 behaviour of this option.
+ //
+ // This option is scheduled to be removed in v21.
kForceFlushPeriodWindowedSort = 2
};
diff --git a/src/trace_processor/forwarding_trace_parser.cc b/src/trace_processor/forwarding_trace_parser.cc
index 94bcc67..dc2cf54 100644
--- a/src/trace_processor/forwarding_trace_parser.cc
+++ b/src/trace_processor/forwarding_trace_parser.cc
@@ -40,6 +40,17 @@
return str;
}
+TraceSorter::SortingMode ConvertSortingMode(SortingMode sorting_mode) {
+ switch (sorting_mode) {
+ case SortingMode::kDefaultHeuristics:
+ case SortingMode::kForceFlushPeriodWindowedSort:
+ return TraceSorter::SortingMode::kDefault;
+ case SortingMode::kForceFullSort:
+ return TraceSorter::SortingMode::kFullSort;
+ }
+ PERFETTO_FATAL("For GCC");
+}
+
// Fuchsia traces have a magic number as documented here:
// https://fuchsia.googlesource.com/fuchsia/+/HEAD/docs/development/tracing/trace-format/README.md#magic-number-record-trace-info-type-0
constexpr uint64_t kFuchsiaMagicNumber = 0x0016547846040010;
@@ -55,7 +66,6 @@
size_t size) {
// If this is the first Parse() call, guess the trace type and create the
// appropriate parser.
- static const int64_t kMaxWindowSize = std::numeric_limits<int64_t>::max();
if (!reader_) {
TraceType trace_type;
{
@@ -70,8 +80,9 @@
reader_ = std::move(context_->json_trace_tokenizer);
// JSON traces have no guarantees about the order of events in them.
- context_->sorter.reset(new TraceSorter(
- std::move(context_->json_trace_parser), kMaxWindowSize));
+ context_->sorter.reset(
+ new TraceSorter(context_, std::move(context_->json_trace_parser),
+ TraceSorter::SortingMode::kFullSort));
} else {
return util::ErrStatus("JSON support is disabled");
}
@@ -79,12 +90,12 @@
}
case kProtoTraceType: {
PERFETTO_DLOG("Proto trace detected");
- // This will be reduced once we read the trace config and we see flush
- // period being set.
+ auto sorting_mode = ConvertSortingMode(context_->config.sorting_mode);
reader_.reset(new ProtoTraceReader(context_));
context_->sorter.reset(new TraceSorter(
+ context_,
std::unique_ptr<TraceParser>(new ProtoTraceParser(context_)),
- kMaxWindowSize));
+ sorting_mode));
context_->process_tracker->SetPidZeroIgnoredForIdleProcess();
break;
}
@@ -101,7 +112,8 @@
// Fuschia traces can have massively out of order events.
context_->sorter.reset(new TraceSorter(
- std::move(context_->fuchsia_trace_parser), kMaxWindowSize));
+ context_, std::move(context_->fuchsia_trace_parser),
+ TraceSorter::SortingMode::kFullSort));
} else {
return util::ErrStatus("Fuchsia support is disabled");
}
diff --git a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
index 5aa9e38..7253e8a 100644
--- a/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
+++ b/src/trace_processor/importers/ftrace/ftrace_tokenizer.cc
@@ -93,7 +93,6 @@
size_t off = bundle.offset_of(it->data());
TokenizeFtraceEvent(cpu, clock_id, bundle.slice(off, it->size()), state);
}
- context_->sorter->FinalizeFtraceEventBatch(cpu);
return base::OkStatus();
}
diff --git a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
index c0f11cd..60c1606 100644
--- a/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/importers/proto/proto_trace_parser_unittest.cc
@@ -252,7 +252,8 @@
context_.flow_tracker.reset(flow_);
clock_ = new ClockTracker(&context_);
context_.clock_tracker.reset(clock_);
- context_.sorter.reset(new TraceSorter(CreateParser(), 0 /*window size*/));
+ context_.sorter.reset(new TraceSorter(&context_, CreateParser(),
+ TraceSorter::SortingMode::kFullSort));
context_.descriptor_pool_.reset(new DescriptorPool());
RegisterDefaultModules(&context_);
@@ -333,6 +334,7 @@
PushSchedSwitch(10, 1000, 10, base::StringView(kProc2Name), 256,
32, 100, base::StringView(kProc1Name), 1024));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, LoadEventsIntoRaw) {
@@ -364,6 +366,7 @@
EXPECT_CALL(*process_, GetOrCreateProcess(123));
Tokenize();
+ context_.sorter->ExtractEventsForced();
const auto& raw = context_.storage->raw_table();
ASSERT_EQ(raw.row_count(), 2u);
@@ -414,6 +417,7 @@
field->set_uint_value(3);
Tokenize();
+ context_.sorter->ExtractEventsForced();
const auto& raw = storage_->raw_table();
@@ -481,6 +485,7 @@
32, 10, base::StringView(kProcName2), 512));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, LoadMultiplePackets) {
@@ -526,6 +531,7 @@
PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
32, 10, base::StringView(kProcName2), 512));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, RepeatedLoadSinglePacket) {
@@ -548,6 +554,7 @@
PushSchedSwitch(10, 1000, 10, base::StringView(kProcName2), 256,
32, 100, base::StringView(kProcName1), 1024));
Tokenize();
+ context_.sorter->ExtractEventsForced();
bundle = trace_->add_packet()->set_ftrace_events();
bundle->set_cpu(10);
@@ -567,6 +574,7 @@
PushSchedSwitch(10, 1001, 100, base::StringView(kProcName1), 256,
32, 10, base::StringView(kProcName2), 512));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, LoadCpuFreq) {
@@ -581,6 +589,7 @@
EXPECT_CALL(*event_, PushCounter(1000, DoubleEq(2000), TrackId{0}));
Tokenize();
+ context_.sorter->ExtractEventsForced();
EXPECT_EQ(context_.storage->cpu_counter_track_table().cpu()[0], 10u);
}
@@ -598,6 +607,7 @@
EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts),
DoubleEq(value * 1024.0), TrackId{0u}));
Tokenize();
+ context_.sorter->ExtractEventsForced();
EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
}
@@ -615,6 +625,7 @@
EXPECT_CALL(*event_, PushCounter(static_cast<int64_t>(ts), DoubleEq(value),
TrackId{0u}));
Tokenize();
+ context_.sorter->ExtractEventsForced();
EXPECT_EQ(context_.storage->track_table().row_count(), 1u);
}
@@ -632,6 +643,7 @@
SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
base::StringView(kProcName1)));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, LoadProcessPacket_FirstCmdline) {
@@ -649,6 +661,7 @@
SetProcessMetadata(1, Eq(3u), base::StringView(kProcName1),
base::StringView("proc1 proc2")));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, LoadThreadPacket) {
@@ -659,11 +672,10 @@
EXPECT_CALL(*process_, UpdateThread(1, 2));
Tokenize();
+ context_.sorter->ExtractEventsForced();
}
TEST_F(ProtoTraceParserTest, ProcessNameFromProcessDescriptor) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -707,8 +719,6 @@
}
TEST_F(ProtoTraceParserTest, ThreadNameFromThreadDescriptor) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -763,9 +773,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedData) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -815,8 +822,7 @@
Tokenize();
- EXPECT_CALL(*process_, UpdateThread(16, 15))
- .WillRepeatedly(Return(1));
+ EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
tables::ThreadTable::Row row(16);
row.upid = 1u;
@@ -860,9 +866,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithoutInternedDataWithTypes) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -909,8 +912,7 @@
Tokenize();
- EXPECT_CALL(*process_, UpdateThread(16, 15))
- .WillRepeatedly(Return(1));
+ EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
tables::ThreadTable::Row row(16);
row.upid = 1u;
@@ -953,9 +955,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithInternedData) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -1087,8 +1086,7 @@
Tokenize();
- EXPECT_CALL(*process_, UpdateThread(16, 15))
- .WillRepeatedly(Return(1));
+ EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
tables::ThreadTable::Row row(16);
row.upid = 2u;
@@ -1178,9 +1176,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventAsyncEvents) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -1334,9 +1329,6 @@
// TODO(eseckler): Also test instant events on separate tracks.
TEST_F(ProtoTraceParserTest, TrackEventWithTrackDescriptors) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
// Sequence 1.
{
auto* packet = trace_->add_packet();
@@ -1540,9 +1532,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithResortedCounterDescriptor) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
// Descriptors with timestamps after the event below. They will be tokenized
// in the order they appear here, but then resorted before parsing to appear
// after the events below.
@@ -1643,9 +1632,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithoutIncrementalStateReset) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -1704,9 +1690,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithoutThreadDescriptor) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
// Event should be discarded because it specifies delta timestamps and no
// thread descriptor was seen yet.
@@ -1746,9 +1729,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithDataLoss) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -1762,7 +1742,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1010.
+ event->set_timestamp_delta_us(10); // absolute: 1010.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1815,7 +1795,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 2010.
+ event->set_timestamp_delta_us(10); // absolute: 2010.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1840,9 +1820,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventMultipleSequences) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -1856,7 +1833,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1010.
+ event->set_timestamp_delta_us(10); // absolute: 1010.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1883,7 +1860,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(2);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1005.
+ event->set_timestamp_delta_us(10); // absolute: 1005.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1901,7 +1878,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1020.
+ event->set_timestamp_delta_us(10); // absolute: 1020.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1911,7 +1888,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(2);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1015.
+ event->set_timestamp_delta_us(10); // absolute: 1015.
event->add_category_iids(1);
auto* legacy_event = event->set_legacy_event();
legacy_event->set_name_iid(1);
@@ -1920,10 +1897,8 @@
Tokenize();
- EXPECT_CALL(*process_, UpdateThread(16, 15))
- .WillRepeatedly(Return(1));
- EXPECT_CALL(*process_, UpdateThread(17, 15))
- .WillRepeatedly(Return(2));
+ EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
+ EXPECT_CALL(*process_, UpdateThread(17, 15)).WillRepeatedly(Return(2));
tables::ThreadTable::Row t1(16);
t1.upid = 1u;
@@ -1950,8 +1925,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithDebugAnnotations) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
MockBoundInserter inserter;
{
@@ -1967,7 +1940,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1010.
+ event->set_timestamp_delta_us(10); // absolute: 1010.
event->add_category_iids(1);
auto* annotation1 = event->add_debug_annotations();
annotation1->set_name_iid(1);
@@ -2019,7 +1992,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1020.
+ event->set_timestamp_delta_us(10); // absolute: 1020.
event->add_category_iids(1);
auto* annotation3 = event->add_debug_annotations();
annotation3->set_name_iid(3);
@@ -2153,9 +2126,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithTaskExecution) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -2169,7 +2139,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1010.
+ event->set_timestamp_delta_us(10); // absolute: 1010.
event->add_category_iids(1);
auto* task_execution = event->set_task_execution();
task_execution->set_posted_from_iid(1);
@@ -2218,9 +2188,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventWithLogMessage) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -2234,7 +2201,7 @@
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
auto* event = packet->set_track_event();
- event->set_timestamp_delta_us(10); // absolute: 1010.
+ event->set_timestamp_delta_us(10); // absolute: 1010.
event->add_category_iids(1);
auto* log_message = event->set_log_message();
@@ -2294,9 +2261,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventParseLegacyEventIntoRawTable) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -2398,9 +2362,6 @@
}
TEST_F(ProtoTraceParserTest, TrackEventLegacyTimestampsWithClockSnapshot) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
clock_->AddSnapshot({{protos::pbzero::BUILTIN_CLOCK_BOOTTIME, 0},
{protos::pbzero::BUILTIN_CLOCK_MONOTONIC, 1000000}});
@@ -2442,9 +2403,6 @@
}
TEST_F(ProtoTraceParserTest, ParseEventWithClockIdButWithoutClockSnapshot) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_timestamp(1000);
@@ -2471,9 +2429,6 @@
static const char kIntName[] = "int_name";
static const int kIntValue = 123;
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_timestamp(1000);
@@ -2511,9 +2466,6 @@
static const char kStringName[] = "string_name";
static const char kStringValue[] = "string_value";
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_timestamp(1000);
@@ -2558,9 +2510,6 @@
static const char kDataPart1[] = "bbb";
static const char kFullData[] = "aaabbb";
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -2588,9 +2537,6 @@
TEST_F(ProtoTraceParserTest, ParseChromeLegacyJsonIntoRawTable) {
static const char kUserTraceEvent[] = "{\"user\":1}";
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
{
auto* packet = trace_->add_packet();
packet->set_trusted_packet_sequence_id(1);
@@ -2622,9 +2568,6 @@
static const char kTag1[] = "tag1";
static const char kTag2[] = "tag2";
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
auto* metadata = trace_->add_packet()->set_chrome_benchmark_metadata();
metadata->set_benchmark_name(kName);
metadata->add_story_tags(kTag1);
@@ -2653,9 +2596,6 @@
}
TEST_F(ProtoTraceParserTest, LoadChromeMetadata) {
- context_.sorter.reset(new TraceSorter(
- CreateParser(), std::numeric_limits<int64_t>::max() /*window size*/));
-
auto* track_event = trace_->add_packet()->set_chrome_events();
{
auto* metadata = track_event->add_metadata();
@@ -2723,6 +2663,7 @@
}
Tokenize();
+ context_.sorter->ExtractEventsForced();
// Packet-level errors reflected in stats storage.
const auto& stats = context_.storage->stats();
@@ -2775,6 +2716,7 @@
}
Tokenize();
+ context_.sorter->ExtractEventsForced();
// Packet-level errors reflected in stats storage.
const auto& stats = context_.storage->stats();
@@ -2860,10 +2802,10 @@
samples->set_process_priority(30);
}
- EXPECT_CALL(*process_, UpdateThread(16, 15))
- .WillRepeatedly(Return(1));
+ EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
Tokenize();
+ context_.sorter->ExtractEventsForced();
// Verify cpu_profile_samples.
const auto& samples = storage_->cpu_profile_stack_sample_table();
@@ -2950,6 +2892,7 @@
EXPECT_CALL(*process_, UpdateThread(16, 15)).WillRepeatedly(Return(1));
Tokenize();
+ context_.sorter->ExtractEventsForced();
const auto& samples = storage_->cpu_profile_stack_sample_table();
EXPECT_EQ(samples.row_count(), 1u);
@@ -2966,6 +2909,7 @@
config->set_trace_uuid_msb(2);
ASSERT_TRUE(Tokenize().ok());
+ context_.sorter->ExtractEventsForced();
SqlValue value =
context_.metadata_tracker->GetMetadataForTesting(metadata::trace_uuid);
@@ -2978,6 +2922,7 @@
config->add_buffers()->set_size_kb(42);
ASSERT_TRUE(Tokenize().ok());
+ context_.sorter->ExtractEventsForced();
SqlValue value = context_.metadata_tracker->GetMetadataForTesting(
metadata::trace_config_pbtxt);
diff --git a/src/trace_processor/importers/proto/proto_trace_reader.cc b/src/trace_processor/importers/proto/proto_trace_reader.cc
index 4b133a8..e8ca900 100644
--- a/src/trace_processor/importers/proto/proto_trace_reader.cc
+++ b/src/trace_processor/importers/proto/proto_trace_reader.cc
@@ -233,49 +233,12 @@
void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
protos::pbzero::TraceConfig::Decoder trace_config(blob);
- // If we're forcing a full sort, we can keep using the INT_MAX duration set
- // when we created the sorter.
- const auto& cfg = context_->config;
- if (cfg.sorting_mode == SortingMode::kForceFullSort) {
- return;
- }
-
- base::Optional<int64_t> flush_period_window_size_ns;
- if (trace_config.has_flush_period_ms() &&
- trace_config.flush_period_ms() > 0) {
- flush_period_window_size_ns =
- static_cast<int64_t>(trace_config.flush_period_ms()) * 2 * 1000 * 1000;
- }
-
- // If we're trying to force flush period based sorting, use that as the
- // window size if it exists.
- if (cfg.sorting_mode == SortingMode::kForceFlushPeriodWindowedSort &&
- flush_period_window_size_ns.has_value()) {
- context_->sorter->SetWindowSizeNs(flush_period_window_size_ns.value());
- return;
- }
-
- // If we end up here, we should use heuristics because either the sorting mode
- // was set as such or we don't have a flush period to force the window size
- // to.
-
- // If we're not forcing anything and this is a write_into_file trace, then
- // use flush_period_ms as an indiciator for how big the sliding window for the
- // sorter should be.
- if (trace_config.write_into_file()) {
- int64_t window_size_ns;
- if (flush_period_window_size_ns.has_value()) {
- window_size_ns = flush_period_window_size_ns.value();
- } else {
- constexpr uint64_t kDefaultWindowNs =
- 180 * 1000 * 1000 * 1000ULL; // 3 minutes.
- PERFETTO_ELOG(
- "It is strongly recommended to have flush_period_ms set when "
- "write_into_file is turned on. You will likely have many dropped "
- "events because of inability to sort the events correctly.");
- window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
- }
- context_->sorter->SetWindowSizeNs(window_size_ns);
+ if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
+ PERFETTO_ELOG(
+ "It is strongly recommended to have flush_period_ms set when "
+ "write_into_file is turned on. This trace will be loaded fully "
+ "into memory before sorting which increases the likliehoold of "
+ "OOMs.");
}
}
@@ -448,6 +411,12 @@
context_->metadata_tracker->SetMetadata(
metadata::all_data_source_started_ns, Variadic::Integer(ts));
}
+ if (tse.all_data_sources_flushed()) {
+ context_->sorter->NotifyFlushEvent();
+ }
+ if (tse.read_tracing_buffers_completed()) {
+ context_->sorter->NotifyReadBufferEvent();
+ }
return util::OkStatus();
}
diff --git a/src/trace_processor/storage/stats.h b/src/trace_processor/storage/stats.h
index 4a05a74..ea71ef0 100644
--- a/src/trace_processor/storage/stats.h
+++ b/src/trace_processor/storage/stats.h
@@ -185,7 +185,11 @@
"the tracing service. This happens if the ftrace buffers were not " \
"cleared properly. These packets are silently dropped by trace " \
"processor."), \
- F(perf_guardrail_stop_ts, kIndexed, kDataLoss, kTrace, "")
+ F(perf_guardrail_stop_ts, kIndexed, kDataLoss, kTrace, ""), \
+ F(sorter_push_event_out_of_order, kSingle, kError, kTrace, \
+ "Trace events are out of order event after sorting. This can happen " \
+ "due to many factors including clock sync drift, producers emitting " \
+ "events out of order or a bug in trace processor's logic of sorting.")
// clang-format on
enum Type {
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index c1bc9e5..a3907dd 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -24,9 +24,12 @@
namespace perfetto {
namespace trace_processor {
-TraceSorter::TraceSorter(std::unique_ptr<TraceParser> parser,
- int64_t window_size_ns)
- : parser_(std::move(parser)), window_size_ns_(window_size_ns) {
+TraceSorter::TraceSorter(TraceProcessorContext* context,
+ std::unique_ptr<TraceParser> parser,
+ SortingMode sorting_mode)
+ : context_(context),
+ parser_(std::move(parser)),
+ sorting_mode_(sorting_mode) {
const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
if (bypass_next_stage_for_testing_)
@@ -56,10 +59,10 @@
PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
}
-// Removes all the events in |queues_| that are earlier than the given window
-// size and moves them to the next parser stages, respecting global timestamp
-// order. This function is a "extract min from N sorted queues", with some
-// little cleverness: we know that events tend to be bursty, so events are
+// Removes all the events in |queues_| that are earlier than the given
+// packet index and moves them to the next parser stages, respecting global
+// timestamp order. This function is a "extract min from N sorted queues", with
+// some little cleverness: we know that events tend to be bursty, so events are
// not going to be randomly distributed on the N |queues_|.
// Upon each iteration this function finds the first two queues (if any) that
// have the oldest events, and extracts events from the 1st until hitting the
@@ -76,12 +79,8 @@
// to avoid re-scanning all the queues all the times) but doesn't seem worth it.
// With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
// time in a profiler.
-void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
-
+void TraceSorter::SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx) {
constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
- const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
- int64_t extract_end_ts = global_max_ts_ - window_size_ns;
size_t iterations = 0;
for (;; iterations++) {
size_t min_queue_idx = 0; // The index of the queue with the min(ts).
@@ -122,13 +121,14 @@
PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
// Now that we identified the min-queue, extract all events from it until
- // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
- // whichever comes first.
- int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
+ // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
+ // limit, whichever comes first.
size_t num_extracted = 0;
for (auto& event : events) {
- if (event.timestamp > extract_until_ts)
+ if (event.packet_idx >= limit_packet_idx ||
+ event.timestamp > min_queue_ts[1]) {
break;
+ }
++num_extracted;
MaybePushEvent(min_queue_idx, std::move(event));
@@ -162,12 +162,6 @@
}
} // for(;;)
- // We decide to extract events only when we know (using the global_{min,max}
- // bounds) that there are eligible events. We should never end up in a
- // situation where we call this function but then realize that there was
- // nothing to extract.
- PERFETTO_DCHECK(iterations > 0 || was_empty);
-
#if PERFETTO_DCHECK_IS_ON()
// Check that the global min/max are consistent.
int64_t dbg_min_ts = kTsMax;
@@ -182,10 +176,15 @@
}
void TraceSorter::MaybePushEvent(size_t queue_idx, TimestampedTracePiece ttp) {
+ int64_t timestamp = ttp.timestamp;
+ if (timestamp < latest_pushed_event_ts_)
+ context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
+
+ latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
+
if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_))
return;
- int64_t timestamp = ttp.timestamp;
if (queue_idx == 0) {
// queues_[0] is for non-ftrace packets.
parser_->ParseTracePacket(timestamp, std::move(ttp));
diff --git a/src/trace_processor/trace_sorter.h b/src/trace_processor/trace_sorter.h
index a93cd26..50bec23 100644
--- a/src/trace_processor/trace_sorter.h
+++ b/src/trace_processor/trace_sorter.h
@@ -39,21 +39,40 @@
// This class takes care of sorting events parsed from the trace stream in
// arbitrary order and pushing them to the next pipeline stages (parsing) in
// order. In order to support streaming use-cases, sorting happens within a
-// max window. Events are held in the TraceSorter staging area (events_) until
-// either (1) the (max - min) timestamp > window_size; (2) trace EOF.
+// window.
//
-// This class is designed around the assumption that:
+// Events are held in the TraceSorter staging area (events_) until either:
+// 1. We can determine that it's safe to extract events by observing
+// TracingServiceEvent Flush and ReadBuffer events
+// 2. The trace EOF is reached
+//
+// Incremental extraction
+//
+// Incremental extraction happens by using a combination of flush and read
+// buffer events from the tracing service. Note that incremental extraction
+// is only applicable for write_into_file traces; ring-buffer traces will
+// be sorted fully in-memory implicitly because there is only a single read
+// buffer call at the end.
+//
+// The algorithm for incremental extraction is explained in detail at
+// go/trace-sorting-is-complicated.
+//
+// Sorting algorithm
+//
+// The sorting algorithm is designed around the assumption that:
// - Most events come from ftrace.
// - Ftrace events are sorted within each cpu most of the times.
//
// Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
// (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
// necessary) before proceeding with the global merge-sort-extract.
-// When an event is pushed through, it is just appeneded to the end of one of
+//
+// When an event is pushed through, it is just appended to the end of one of
// the N queues. While appending, we keep track of the fact that the queue
// is still ordered or just lost ordering. When an out-of-order event is
// detected on a queue we keep track of: (1) the offset within the queue where
// the chaos begun, (2) the timestamp that broke the ordering.
+//
// When we decide to extract events from the queues into the next stages of
// the trace processor, we re-sort the events in the queue. Rather than
// re-sorting everything all the times, we use the above knowledge to restrict
@@ -65,41 +84,43 @@
// from there to the end.
class TraceSorter {
public:
- TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns);
+ enum class SortingMode {
+ kDefault,
+ kFullSort,
+ };
+
+ TraceSorter(TraceProcessorContext* context,
+ std::unique_ptr<TraceParser> parser,
+ SortingMode);
inline void PushTracePacket(int64_t timestamp,
PacketSequenceState* state,
TraceBlobView packet) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
- AppendNonFtraceAndMaybeExtractEvents(
- TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet),
- state->current_generation()));
+ AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+ std::move(packet),
+ state->current_generation()));
}
inline void PushJsonValue(int64_t timestamp, std::string json_value) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
- AppendNonFtraceAndMaybeExtractEvents(
+ AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
}
inline void PushFuchsiaRecord(int64_t timestamp,
std::unique_ptr<FuchsiaRecord> record) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
- AppendNonFtraceAndMaybeExtractEvents(
+ AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
}
inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
-
int64_t timestamp = systrace_line->ts;
- AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece(
- timestamp, packet_idx_++, std::move(systrace_line)));
+ AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
+ std::move(systrace_line)));
}
inline void PushTrackEventPacket(int64_t timestamp,
std::unique_ptr<TrackEventData> data) {
- AppendNonFtraceAndMaybeExtractEvents(
+ AppendNonFtraceEvent(
TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
}
@@ -107,65 +128,54 @@
int64_t timestamp,
TraceBlobView event,
PacketSequenceState* state) {
- set_ftrace_batch_cpu_for_DCHECK(cpu);
- GetQueue(cpu + 1)->Append(TimestampedTracePiece(
+ auto* queue = GetQueue(cpu + 1);
+ queue->Append(TimestampedTracePiece(
timestamp, packet_idx_++,
FtraceEventData{std::move(event), state->current_generation()}));
-
- // The caller must call FinalizeFtraceEventBatch() after having pushed a
- // batch of ftrace events. This is to amortize the overhead of handling
- // global ordering and doing that in batches only after all ftrace events
- // for a bundle are pushed.
+ UpdateGlobalTs(queue);
}
inline void PushInlineFtraceEvent(uint32_t cpu,
int64_t timestamp,
InlineSchedSwitch inline_sched_switch) {
- set_ftrace_batch_cpu_for_DCHECK(cpu);
- GetQueue(cpu + 1)->Append(
- TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
-
- // As with |PushFtraceEvent|, doesn't immediately sort the affected queues.
// TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
// pushed through this function), the ftrace batches will no longer be fully
// sorted by timestamp. In such situations, we will have to sort at the end
// of the batch. We can do better as both sub-sequences are sorted however.
// Consider adding extra queues, or pushing them in a merge-sort fashion
// instead.
+ auto* queue = GetQueue(cpu + 1);
+ queue->Append(
+ TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
+ UpdateGlobalTs(queue);
}
inline void PushInlineFtraceEvent(uint32_t cpu,
int64_t timestamp,
InlineSchedWaking inline_sched_waking) {
- set_ftrace_batch_cpu_for_DCHECK(cpu);
- GetQueue(cpu + 1)->Append(
+ auto* queue = GetQueue(cpu + 1);
+ queue->Append(
TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
- }
- inline void FinalizeFtraceEventBatch(uint32_t cpu) {
- DCHECK_ftrace_batch_cpu(cpu);
- set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
- MaybeExtractEvents(GetQueue(cpu + 1));
+ UpdateGlobalTs(queue);
}
- // Extract all events ignoring the window.
void ExtractEventsForced() {
- SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
+ SortAndExtractEventsUntilPacket(packet_idx_);
queues_.resize(0);
+
+ packet_idx_for_extraction_ = packet_idx_;
+ flushes_since_extraction_ = 0;
}
- // Sets the window size to be the size specified (which should be lower than
- // any previous window size specified) and flushes any data beyond
- // this window size.
- // It is undefined to call this function with a window size greater than than
- // the current size.
- void SetWindowSizeNs(int64_t window_size_ns) {
- PERFETTO_DCHECK(window_size_ns <= window_size_ns_);
+ void NotifyFlushEvent() { flushes_since_extraction_++; }
- PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns);
- window_size_ns_ = window_size_ns;
-
- // Fast path: if, globally, we are within the window size, then just exit.
- if (global_max_ts_ - global_min_ts_ < window_size_ns)
+ void NotifyReadBufferEvent() {
+ if (sorting_mode_ == SortingMode::kFullSort ||
+ flushes_since_extraction_ < 2) {
return;
- SortAndExtractEventsBeyondWindow(window_size_ns_);
+ }
+
+ SortAndExtractEventsUntilPacket(packet_idx_for_extraction_);
+ packet_idx_for_extraction_ = packet_idx_;
+ flushes_since_extraction_ = 0;
}
int64_t max_timestamp() const { return global_max_ts_; }
@@ -210,9 +220,7 @@
int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
};
- // This method passes any events older than window_size_ns to the
- // parser to be parsed and then stored.
- void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
+ void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx);
inline Queue* GetQueue(size_t index) {
if (PERFETTO_UNLIKELY(index >= queues_.size()))
@@ -220,57 +228,40 @@
return &queues_[index];
}
- inline void AppendNonFtraceAndMaybeExtractEvents(TimestampedTracePiece ttp) {
- // Fast path: if this event is before all other events in the sorter and
- // happened more than the window size in the past, just push the event to
- // the next stage. This saves all the sorting logic which would simply move
- // this event to the head of the queue and then extract it out.
- //
- // In practice, these events will be rejected as being "out-of-order" later
- // on in trace processor (i.e. in EventTracker or SliceTracker); we don't
- // drop here to allow them to track packet drop stats.
- //
- // See b/188392852 for an example of where this condition would be hit in
- // practice.
- bool is_before_all_events = ttp.timestamp < global_max_ts_;
- bool is_before_window = global_max_ts_ - ttp.timestamp >= window_size_ns_;
- if (PERFETTO_UNLIKELY(is_before_all_events && is_before_window)) {
- MaybePushEvent(0, std::move(ttp));
- return;
- }
-
- // Slow path: append the event to the non-ftrace queue and extract any
- // events if available.
+ inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) {
Queue* queue = GetQueue(0);
queue->Append(std::move(ttp));
- MaybeExtractEvents(queue);
+ UpdateGlobalTs(queue);
}
- inline void MaybeExtractEvents(Queue* queue) {
- DCHECK_ftrace_batch_cpu(kNoBatch);
- global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
+ inline void UpdateGlobalTs(Queue* queue) {
global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
-
- // Fast path: if, globally, we are within the window size, then just exit.
- if (global_max_ts_ - global_min_ts_ < window_size_ns_)
- return;
- SortAndExtractEventsBeyondWindow(window_size_ns_);
+ global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
}
void MaybePushEvent(size_t queue_idx,
TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE;
+ TraceProcessorContext* context_;
std::unique_ptr<TraceParser> parser_;
+ // Whether we should ignore incremental extraction and just wait for
+ // forced extractionn at the end of the trace.
+ SortingMode sorting_mode_ = SortingMode::kDefault;
+
+ // The packet index until which events should be extracted. Set based
+ // on the packet index in |OnReadBuffer|.
+ uint64_t packet_idx_for_extraction_ = 0;
+
+ // The number of flushes which have happened since the last incremental
+ // extraction.
+ uint32_t flushes_since_extraction_ = 0;
+
// queues_[0] is the general (non-ftrace) queue.
// queues_[1] is the ftrace queue for CPU(0).
// queues_[x] is the ftrace queue for CPU(x - 1).
std::vector<Queue> queues_;
- // Events are propagated to the next stage only after (max - min) timestamp
- // is larger than this value.
- int64_t window_size_ns_;
-
// max(e.timestamp for e in queues_).
int64_t global_max_ts_ = 0;
@@ -283,23 +274,8 @@
// Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
bool bypass_next_stage_for_testing_ = false;
-#if PERFETTO_DCHECK_IS_ON()
- // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
- uint32_t ftrace_batch_cpu_ = kNoBatch;
-
- inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
- PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
- }
-
- inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
- PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
- cpu == kNoBatch);
- ftrace_batch_cpu_ = cpu;
- }
-#else
- inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
- inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
-#endif
+ // max(e.ts for e pushed to next stage)
+ int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
};
} // namespace trace_processor
diff --git a/src/trace_processor/trace_sorter_unittest.cc b/src/trace_processor/trace_sorter_unittest.cc
index b49f515..18c6d1e 100644
--- a/src/trace_processor/trace_sorter_unittest.cc
+++ b/src/trace_processor/trace_sorter_unittest.cc
@@ -76,13 +76,16 @@
: test_buffer_(std::unique_ptr<uint8_t[]>(new uint8_t[8]), 0, 8) {
storage_ = new NiceMock<MockTraceStorage>();
context_.storage.reset(storage_);
+ CreateSorter();
+ }
+ void CreateSorter(bool full_sort = true) {
std::unique_ptr<MockTraceParser> parser(new MockTraceParser(&context_));
parser_ = parser.get();
-
+ auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
+ : TraceSorter::SortingMode::kDefault;
context_.sorter.reset(
- new TraceSorter(std::move(parser),
- std::numeric_limits<int64_t>::max() /*window_size*/));
+ new TraceSorter(&context_, std::move(parser), sorting_mode));
}
protected:
@@ -98,7 +101,6 @@
EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view.data(), 1));
context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
std::move(view), &state);
- context_.sorter->FinalizeFtraceEventBatch(0);
context_.sorter->ExtractEventsForced();
}
@@ -107,7 +109,6 @@
TraceBlobView view = test_buffer_.slice(0, 1);
EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
context_.sorter->PushTracePacket(1000, &state, std::move(view));
- context_.sorter->FinalizeFtraceEventBatch(1000);
context_.sorter->ExtractEventsForced();
}
@@ -125,64 +126,134 @@
EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
- context_.sorter->SetWindowSizeNs(200);
context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
std::move(view_4), &state);
- context_.sorter->FinalizeFtraceEventBatch(2);
context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
std::move(view_1), &state);
-
- context_.sorter->FinalizeFtraceEventBatch(0);
context_.sorter->ExtractEventsForced();
}
-TEST_F(TraceSorterTest, SetWindowSize) {
+TEST_F(TraceSorterTest, IncrementalExtraction) {
+ CreateSorter(false);
+
PacketSequenceState state(&context_);
+
+ TraceBlobView view_1 = test_buffer_.slice(0, 1);
+ TraceBlobView view_2 = test_buffer_.slice(0, 2);
+ TraceBlobView view_3 = test_buffer_.slice(0, 3);
+ TraceBlobView view_4 = test_buffer_.slice(0, 4);
+ TraceBlobView view_5 = test_buffer_.slice(0, 5);
+
+ // Flush at the start of packet sequence to match behavior of the
+ // service.
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+ context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+
+ // No data should be exttracted at this point because we haven't
+ // seen two flushes yet.
+ context_.sorter->NotifyReadBufferEvent();
+
+ // Now that we've seen two flushes, we should be ready to start extracting
+ // data on the next OnReadBufer call (after two flushes as usual).
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyReadBufferEvent();
+
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1400, &state, std::move(view_4));
+ context_.sorter->PushTracePacket(1300, &state, std::move(view_3));
+
+ // This ReadBuffer call should finally extract until the first OnReadBuffer
+ // call.
+ {
+ InSequence s;
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
+ }
+ context_.sorter->NotifyReadBufferEvent();
+
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1500, &state, std::move(view_5));
+
+ // Nothing should be extracted as we haven't seen the second flush.
+ context_.sorter->NotifyReadBufferEvent();
+
+ // Now we've seen the second flush we should extract the next two packets.
+ context_.sorter->NotifyFlushEvent();
+ {
+ InSequence s;
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
+ }
+ context_.sorter->NotifyReadBufferEvent();
+
+ // The forced extraction should get the last packet.
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
+ context_.sorter->ExtractEventsForced();
+}
+
+// Simulate a producer bug where the third packet is emitted
+// out of order. Verify that we track the stats correctly.
+TEST_F(TraceSorterTest, OutOfOrder) {
+ CreateSorter(false);
+
+ PacketSequenceState state(&context_);
+
TraceBlobView view_1 = test_buffer_.slice(0, 1);
TraceBlobView view_2 = test_buffer_.slice(0, 2);
TraceBlobView view_3 = test_buffer_.slice(0, 3);
TraceBlobView view_4 = test_buffer_.slice(0, 4);
- MockFunction<void(std::string check_point_name)> check;
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1200, &state, std::move(view_2));
+ context_.sorter->PushTracePacket(1100, &state, std::move(view_1));
+ context_.sorter->NotifyReadBufferEvent();
+ // Both of the packets should have been pushed through.
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
{
InSequence s;
-
- EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1));
- EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
- EXPECT_CALL(check, Call("1"));
- EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
- EXPECT_CALL(check, Call("2"));
- EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
}
+ context_.sorter->NotifyReadBufferEvent();
- context_.sorter->SetWindowSizeNs(200);
- context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
- std::move(view_4), &state);
- context_.sorter->FinalizeFtraceEventBatch(2);
- context_.sorter->PushTracePacket(1001, &state, std::move(view_2));
- context_.sorter->PushTracePacket(1100, &state, std::move(view_3));
+ // Now, pass the third packet out of order.
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1150, &state, std::move(view_3));
+ context_.sorter->NotifyReadBufferEvent();
- context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
- std::move(view_1), &state);
- context_.sorter->FinalizeFtraceEventBatch(0);
+ // The third packet should still be pushed through.
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1150, test_buffer_.data(), 3));
+ context_.sorter->NotifyReadBufferEvent();
- // At this point, we should just flush the 1000 and 1001 packets.
- context_.sorter->SetWindowSizeNs(101);
+ // But we should also increment the stat that this was out of order.
+ ASSERT_EQ(
+ context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+ 1);
- // Inform the mock about where we are.
- check.Call("1");
+ // Push the fourth packet also out of order but after third.
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->NotifyFlushEvent();
+ context_.sorter->PushTracePacket(1170, &state, std::move(view_4));
+ context_.sorter->NotifyReadBufferEvent();
- // Now we should flush the 1100 packet.
- context_.sorter->SetWindowSizeNs(99);
-
- // Inform the mock about where we are.
- check.Call("2");
-
- // Now we should flush the 1200 packet.
+ // The fourt packet should still be pushed through.
+ EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1170, test_buffer_.data(), 4));
context_.sorter->ExtractEventsForced();
+
+ // But we should also increment the stat that this was out of order.
+ ASSERT_EQ(
+ context_.storage->stats()[stats::sorter_push_event_out_of_order].value,
+ 2);
}
// Simulates a random stream of ftrace events happening on random CPUs.
@@ -219,7 +290,6 @@
expectations[ts].push_back(cpu);
context_.sorter->PushFtraceEvent(cpu, ts, TraceBlobView(nullptr, 0, 0),
&state);
- context_.sorter->FinalizeFtraceEventBatch(cpu);
}
}