TraceProcessor: Add binary RPC interface
Doc: https://docs.google.com/document/d/1Xno5BovxYweoSC6G3UjcsVJD65FLFTUjzUR0FXUmt0s/edit?usp=sharing
This CL doesn't wire it up yet to RPC or Wasm.
Next CLs will do.
Test: perfetto_unittests --gtest_filter=QueryResultSerializerTest.*
Change-Id: I08b7bb638fe4deabf41e14e30fc548aec9ab58e4
Bug: 159142289
diff --git a/Android.bp b/Android.bp
index ce5d302..dd7c365 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3131,6 +3131,158 @@
],
}
+// GN: //protos/perfetto/metrics/android:zero
+genrule {
+ name: "perfetto_protos_perfetto_metrics_android_zero_gen",
+ srcs: [
+ "protos/perfetto/metrics/android/batt_metric.proto",
+ "protos/perfetto/metrics/android/cpu_metric.proto",
+ "protos/perfetto/metrics/android/display_metrics.proto",
+ "protos/perfetto/metrics/android/heap_profile_callsites.proto",
+ "protos/perfetto/metrics/android/hwui_metric.proto",
+ "protos/perfetto/metrics/android/ion_metric.proto",
+ "protos/perfetto/metrics/android/java_heap_histogram.proto",
+ "protos/perfetto/metrics/android/java_heap_stats.proto",
+ "protos/perfetto/metrics/android/lmk_metric.proto",
+ "protos/perfetto/metrics/android/lmk_reason_metric.proto",
+ "protos/perfetto/metrics/android/mem_metric.proto",
+ "protos/perfetto/metrics/android/mem_unagg_metric.proto",
+ "protos/perfetto/metrics/android/package_list.proto",
+ "protos/perfetto/metrics/android/powrails_metric.proto",
+ "protos/perfetto/metrics/android/process_metadata.proto",
+ "protos/perfetto/metrics/android/startup_metric.proto",
+ "protos/perfetto/metrics/android/surfaceflinger.proto",
+ "protos/perfetto/metrics/android/task_names.proto",
+ "protos/perfetto/metrics/android/thread_time_in_state_metric.proto",
+ "protos/perfetto/metrics/android/unmapped_java_symbols.proto",
+ "protos/perfetto/metrics/android/unsymbolized_frames.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/metrics/android/batt_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/cpu_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/display_metrics.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/heap_profile_callsites.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/hwui_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/ion_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/java_heap_histogram.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/java_heap_stats.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/lmk_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/lmk_reason_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/mem_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/mem_unagg_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/package_list.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/powrails_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/process_metadata.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/startup_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/surfaceflinger.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/task_names.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/thread_time_in_state_metric.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/unmapped_java_symbols.pbzero.cc",
+ "external/perfetto/protos/perfetto/metrics/android/unsymbolized_frames.pbzero.cc",
+ ],
+}
+
+// GN: //protos/perfetto/metrics/android:zero
+genrule {
+ name: "perfetto_protos_perfetto_metrics_android_zero_gen_headers",
+ srcs: [
+ "protos/perfetto/metrics/android/batt_metric.proto",
+ "protos/perfetto/metrics/android/cpu_metric.proto",
+ "protos/perfetto/metrics/android/display_metrics.proto",
+ "protos/perfetto/metrics/android/heap_profile_callsites.proto",
+ "protos/perfetto/metrics/android/hwui_metric.proto",
+ "protos/perfetto/metrics/android/ion_metric.proto",
+ "protos/perfetto/metrics/android/java_heap_histogram.proto",
+ "protos/perfetto/metrics/android/java_heap_stats.proto",
+ "protos/perfetto/metrics/android/lmk_metric.proto",
+ "protos/perfetto/metrics/android/lmk_reason_metric.proto",
+ "protos/perfetto/metrics/android/mem_metric.proto",
+ "protos/perfetto/metrics/android/mem_unagg_metric.proto",
+ "protos/perfetto/metrics/android/package_list.proto",
+ "protos/perfetto/metrics/android/powrails_metric.proto",
+ "protos/perfetto/metrics/android/process_metadata.proto",
+ "protos/perfetto/metrics/android/startup_metric.proto",
+ "protos/perfetto/metrics/android/surfaceflinger.proto",
+ "protos/perfetto/metrics/android/task_names.proto",
+ "protos/perfetto/metrics/android/thread_time_in_state_metric.proto",
+ "protos/perfetto/metrics/android/unmapped_java_symbols.proto",
+ "protos/perfetto/metrics/android/unsymbolized_frames.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/metrics/android/batt_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/cpu_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/display_metrics.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/heap_profile_callsites.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/hwui_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/ion_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/java_heap_histogram.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/java_heap_stats.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/lmk_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/lmk_reason_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/mem_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/mem_unagg_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/package_list.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/powrails_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/process_metadata.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/startup_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/surfaceflinger.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/task_names.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/thread_time_in_state_metric.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/unmapped_java_symbols.pbzero.h",
+ "external/perfetto/protos/perfetto/metrics/android/unsymbolized_frames.pbzero.h",
+ ],
+ export_include_dirs: [
+ ".",
+ "protos",
+ ],
+}
+
+// GN: //protos/perfetto/metrics:zero
+genrule {
+ name: "perfetto_protos_perfetto_metrics_zero_gen",
+ srcs: [
+ "protos/perfetto/metrics/metrics.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/metrics/metrics.pbzero.cc",
+ ],
+}
+
+// GN: //protos/perfetto/metrics:zero
+genrule {
+ name: "perfetto_protos_perfetto_metrics_zero_gen_headers",
+ srcs: [
+ "protos/perfetto/metrics/metrics.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/metrics/metrics.pbzero.h",
+ ],
+ export_include_dirs: [
+ ".",
+ "protos",
+ ],
+}
+
// GN: //protos/perfetto/trace/android:cpp
genrule {
name: "perfetto_protos_perfetto_trace_android_cpp_gen",
@@ -4863,6 +5015,42 @@
],
}
+// GN: //protos/perfetto/trace_processor:zero
+genrule {
+ name: "perfetto_protos_perfetto_trace_processor_zero_gen",
+ srcs: [
+ "protos/perfetto/trace_processor/trace_processor.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/trace_processor/trace_processor.pbzero.cc",
+ ],
+}
+
+// GN: //protos/perfetto/trace_processor:zero
+genrule {
+ name: "perfetto_protos_perfetto_trace_processor_zero_gen_headers",
+ srcs: [
+ "protos/perfetto/trace_processor/trace_processor.proto",
+ ],
+ tools: [
+ "aprotoc",
+ "protozero_plugin",
+ ],
+ cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+ out: [
+ "external/perfetto/protos/perfetto/trace_processor/trace_processor.pbzero.h",
+ ],
+ export_include_dirs: [
+ ".",
+ "protos",
+ ],
+}
+
// GN: //protos/perfetto/trace/profiling:cpp
genrule {
name: "perfetto_protos_perfetto_trace_profiling_cpp_gen",
@@ -6556,6 +6744,23 @@
],
}
+// GN: //src/trace_processor/rpc:rpc
+filegroup {
+ name: "perfetto_src_trace_processor_rpc_rpc",
+ srcs: [
+ "src/trace_processor/rpc/query_result_serializer.cc",
+ "src/trace_processor/rpc/rpc.cc",
+ ],
+}
+
+// GN: //src/trace_processor/rpc:unittests
+filegroup {
+ name: "perfetto_src_trace_processor_rpc_unittests",
+ srcs: [
+ "src/trace_processor/rpc/query_result_serializer_unittest.cc",
+ ],
+}
+
// GN: //src/trace_processor/sqlite:sqlite
filegroup {
name: "perfetto_src_trace_processor_sqlite_sqlite",
@@ -7550,6 +7755,8 @@
":perfetto_protos_perfetto_ipc_cpp_gen",
":perfetto_protos_perfetto_ipc_ipc_gen",
":perfetto_protos_perfetto_ipc_wire_protocol_cpp_gen",
+ ":perfetto_protos_perfetto_metrics_android_zero_gen",
+ ":perfetto_protos_perfetto_metrics_zero_gen",
":perfetto_protos_perfetto_trace_android_cpp_gen",
":perfetto_protos_perfetto_trace_android_zero_gen",
":perfetto_protos_perfetto_trace_chrome_cpp_gen",
@@ -7571,6 +7778,7 @@
":perfetto_protos_perfetto_trace_power_cpp_gen",
":perfetto_protos_perfetto_trace_power_zero_gen",
":perfetto_protos_perfetto_trace_processor_metrics_impl_zero_gen",
+ ":perfetto_protos_perfetto_trace_processor_zero_gen",
":perfetto_protos_perfetto_trace_profiling_cpp_gen",
":perfetto_protos_perfetto_trace_profiling_zero_gen",
":perfetto_protos_perfetto_trace_ps_cpp_gen",
@@ -7637,6 +7845,8 @@
":perfetto_src_trace_processor_metatrace",
":perfetto_src_trace_processor_metrics_lib",
":perfetto_src_trace_processor_metrics_unittests",
+ ":perfetto_src_trace_processor_rpc_rpc",
+ ":perfetto_src_trace_processor_rpc_unittests",
":perfetto_src_trace_processor_sqlite_sqlite",
":perfetto_src_trace_processor_sqlite_unittests",
":perfetto_src_trace_processor_storage_full",
@@ -7741,6 +7951,8 @@
"perfetto_protos_perfetto_ipc_cpp_gen_headers",
"perfetto_protos_perfetto_ipc_ipc_gen_headers",
"perfetto_protos_perfetto_ipc_wire_protocol_cpp_gen_headers",
+ "perfetto_protos_perfetto_metrics_android_zero_gen_headers",
+ "perfetto_protos_perfetto_metrics_zero_gen_headers",
"perfetto_protos_perfetto_trace_android_cpp_gen_headers",
"perfetto_protos_perfetto_trace_android_zero_gen_headers",
"perfetto_protos_perfetto_trace_chrome_cpp_gen_headers",
@@ -7762,6 +7974,7 @@
"perfetto_protos_perfetto_trace_power_cpp_gen_headers",
"perfetto_protos_perfetto_trace_power_zero_gen_headers",
"perfetto_protos_perfetto_trace_processor_metrics_impl_zero_gen_headers",
+ "perfetto_protos_perfetto_trace_processor_zero_gen_headers",
"perfetto_protos_perfetto_trace_profiling_cpp_gen_headers",
"perfetto_protos_perfetto_trace_profiling_zero_gen_headers",
"perfetto_protos_perfetto_trace_ps_cpp_gen_headers",
diff --git a/BUILD b/BUILD
index fa4dc90..13ad87a 100644
--- a/BUILD
+++ b/BUILD
@@ -815,6 +815,8 @@
filegroup(
name = "src_trace_processor_rpc_rpc",
srcs = [
+ "src/trace_processor/rpc/query_result_serializer.cc",
+ "src/trace_processor/rpc/query_result_serializer.h",
"src/trace_processor/rpc/rpc.cc",
"src/trace_processor/rpc/rpc.h",
],
diff --git a/gn/perfetto_benchmarks.gni b/gn/perfetto_benchmarks.gni
index 46e067c..ca2e36a 100644
--- a/gn/perfetto_benchmarks.gni
+++ b/gn/perfetto_benchmarks.gni
@@ -25,6 +25,7 @@
"src/traced/probes/ftrace:benchmarks",
"src/tracing/core:benchmarks",
"src/tracing:benchmarks",
+ "src/trace_processor/rpc:benchmarks",
"test:benchmark_main",
"test:end_to_end_benchmarks",
]
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index cc41758..7cdfc1a 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -85,6 +85,14 @@
return value;
}
+ static SqlValue Bytes(const void* v, size_t size) {
+ SqlValue value;
+ value.bytes_value = v;
+ value.bytes_count = size;
+ value.type = Type::kBytes;
+ return value;
+ }
+
double AsDouble() const {
PERFETTO_CHECK(type == kDouble);
return double_value;
diff --git a/include/perfetto/trace_processor/iterator.h b/include/perfetto/trace_processor/iterator.h
index 9578f8f..f61f5d7 100644
--- a/include/perfetto/trace_processor/iterator.h
+++ b/include/perfetto/trace_processor/iterator.h
@@ -65,6 +65,15 @@
util::Status Status();
private:
+ friend class QueryResultSerializer;
+
+ // This is to allow QueryResultSerializer, which is very perf sensitive, to
+ // access direct the impl_ and avoid one extra function call for each cell.
+ template <typename T = IteratorImpl>
+ std::unique_ptr<T> take_impl() {
+ return std::move(iterator_);
+ }
+
// A PIMPL pattern is used to avoid leaking the dependencies on sqlite3.h and
// other internal classes.
std::unique_ptr<IteratorImpl> iterator_;
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index 66bbe63..6dc3fe1 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -44,6 +44,7 @@
}
// Output for the /raw_query endpoint.
+// DEPRECATED, use /query. See QueryResult below.
message RawQueryResult {
message ColumnDesc {
optional string name = 1;
@@ -74,6 +75,60 @@
optional uint64 execution_time_ns = 5;
}
+// Output for the /query endpoint.
+// Returns a query result set, grouping cells into batches. Batching allows a
+// more efficient encoding of results, at the same time allowing to return
+// O(M) results in a pipelined fashion, without full-memory buffering.
+// Batches are split when either a large number of cells (~thousands) is reached
+// or the string/blob payload becomes too large (~hundreds of KB).
+// Data is batched in cells, scanning results by row -> column. e.g. if a query
+// returns 3 columns and 2 rows, the cells will be emitted in this order:
+// R0C0, R0C1, R0C2, R1C0, R1C1, R1C2.
+message QueryResult {
+ // This determines the number and names of columns.
+ repeated string column_names = 1;
+
+ // If non-emty the query returned an error. Note that some cells might still
+ // be present, if the error happened while iterating.
+ optional string error = 2;
+
+ // A batch contains an array of cell headers, stating the type of each cell.
+ // The payload of each cell is stored in the corresponding xxx_cells field
+ // below (unless the cell is NULL).
+ // So if |cells| contains: [VARINT, FLOAT64, VARINT, STRING], the results will
+ // be available as:
+ // [varint_cells[0], float64_cells[0], varint_cells[1], string_cells[0]].
+ message CellsBatch {
+ enum CellType {
+ CELL_INVALID = 0;
+ CELL_NULL = 1;
+ CELL_VARINT = 2;
+ CELL_FLOAT64 = 3;
+ CELL_STRING = 4;
+ CELL_BLOB = 5;
+ }
+ repeated CellType cells = 1 [packed = true];
+
+ repeated int64 varint_cells = 2 [packed = true];
+ repeated double float64_cells = 3 [packed = true];
+ repeated bytes blob_cells = 4;
+
+ // The string cells are concatenated in a single field. Each cell is
+ // NUL-terminated. This is because JS incurs into a non-negligible overhead
+ // when decoding strings and one decode + split('\0') is measurably faster
+ // than decoding N strings. See goto.google.com/postmessage-benchmark .
+ // \0-concatenated.
+ optional string string_cells = 5;
+
+ // If true this is the last batch for the query result.
+ optional bool is_last_batch = 6;
+
+ // Padding field. Used only to re-align and fill gaps in the binary format.
+ reserved 7;
+ }
+ repeated CellsBatch batch = 3;
+}
+
// Input for the /status endpoint.
message StatusArgs {}
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index cc061ae..02925da 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -299,7 +299,6 @@
":metatrace",
":storage_full",
"../../gn:default_deps",
- "../../gn:sqlite",
"../../protos/perfetto/trace/ftrace:zero",
"../base",
"analysis",
@@ -311,7 +310,10 @@
"tables",
"types",
]
- public_deps = [ "../../include/perfetto/trace_processor" ]
+ public_deps = [
+ "../../gn:sqlite", # iterator_impl.h includes sqlite3.h.
+ "../../include/perfetto/trace_processor",
+ ]
if (enable_perfetto_trace_processor_json) {
deps += [ "../../gn:jsoncpp" ]
}
@@ -381,6 +383,7 @@
"db:unittests",
"importers:common",
"importers:unittests",
+ "rpc:unittests",
"storage",
"tables:unittests",
"types",
diff --git a/src/trace_processor/rpc/BUILD.gn b/src/trace_processor/rpc/BUILD.gn
index 15690b2..c81552c 100644
--- a/src/trace_processor/rpc/BUILD.gn
+++ b/src/trace_processor/rpc/BUILD.gn
@@ -13,6 +13,7 @@
# limitations under the License.
import("../../../gn/perfetto.gni")
+import("../../../gn/test.gni")
import("../../../gn/wasm.gni")
# Prevent that this file is accidentally included in embedder builds.
@@ -22,10 +23,13 @@
# interface) and by the :httpd module for the HTTP interface.
source_set("rpc") {
sources = [
+ "query_result_serializer.cc",
+ "query_result_serializer.h",
"rpc.cc",
"rpc.h",
]
deps = [
+ "..:lib",
"..:metatrace",
"../../../gn:default_deps",
"../../../include/perfetto/trace_processor",
@@ -35,6 +39,19 @@
]
}
+perfetto_unittest_source_set("unittests") {
+ testonly = true
+ sources = [ "query_result_serializer_unittest.cc" ]
+ deps = [
+ ":rpc",
+ "..:lib",
+ "../../../gn:default_deps",
+ "../../../gn:gtest_and_gmock",
+ "../../../protos/perfetto/trace_processor:zero",
+ "../../base",
+ ]
+}
+
if (enable_perfetto_trace_processor_httpd) {
source_set("httpd") {
sources = [
@@ -64,3 +81,18 @@
]
}
}
+
+if (enable_perfetto_benchmarks) {
+ source_set("benchmarks") {
+ testonly = true
+ deps = [
+ ":rpc",
+ "..:lib",
+ "../../../gn:benchmark",
+ "../../../gn:default_deps",
+ "../../../gn:sqlite",
+ "../../base",
+ ]
+ sources = [ "query_result_serializer_benchmark.cc" ]
+ }
+}
diff --git a/src/trace_processor/rpc/query_result_serializer.cc b/src/trace_processor/rpc/query_result_serializer.cc
new file mode 100644
index 0000000..be44297
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer.cc
@@ -0,0 +1,297 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <vector>
+
+#include "perfetto/protozero/packed_repeated_fields.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+#include "src/trace_processor/iterator_impl.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+
+namespace pu = ::protozero::proto_utils;
+using BatchProto = protos::pbzero::QueryResult::CellsBatch;
+using ResultProto = protos::pbzero::QueryResult;
+
+// The reserved field in trace_processor.proto.
+static constexpr uint32_t kPaddingFieldId = 7;
+
+uint8_t MakeLenDelimTag(uint32_t field_num) {
+ uint32_t tag = pu::MakeTagLengthDelimited(field_num);
+ PERFETTO_DCHECK(tag <= 127); // Must fit in one byte.
+ return static_cast<uint8_t>(tag);
+}
+
+} // namespace
+
+QueryResultSerializer::QueryResultSerializer(Iterator iter)
+ : iter_(iter.take_impl()), num_cols_(iter_->ColumnCount()) {}
+
+QueryResultSerializer::~QueryResultSerializer() = default;
+
+bool QueryResultSerializer::Serialize(std::vector<uint8_t>* buf) {
+ PERFETTO_CHECK(!eof_reached_);
+
+ // In non-production builds avoid the big reservation. This is to avoid hiding
+ // bugs that accidentally depend on pointer stability across resizes.
+#if !PERFETTO_DCHECK_IS_ON()
+ buf->reserve(buf->size() + batch_split_threshold_ + 4096);
+#endif
+
+ if (!did_write_column_names_) {
+ SerializeColumnNames(buf);
+ did_write_column_names_ = true;
+ }
+
+ // In case of an error we still want to go through SerializeBatch(). That will
+ // write an empty batch with the EOF marker. Errors can happen also in the
+ // middle of a query, not just before starting it.
+
+ SerializeBatch(buf);
+ MaybeSerializeError(buf);
+
+ return !eof_reached_;
+}
+
+void QueryResultSerializer::SerializeBatch(std::vector<uint8_t>* buf) {
+ // The buffer is filled in this way:
+ // - Append all the strings as we iterate through the results. The rationale
+ // is that strings are typically the largest part of the result and we want
+ // to avoid copying these.
+ // - While iterating, buffer all other types of cells. They will be appended
+ // at the end of the batch, after the string payload is known.
+
+ // Note: this function uses uint32_t instead of size_t because Wasm doesn't
+ // have yet native 64-bit integers and this is perf-sensitive.
+ const uint32_t initial_size = static_cast<uint32_t>(buf->size());
+
+ buf->push_back(MakeLenDelimTag(ResultProto::kBatchFieldNumber));
+ const uint32_t batch_size_hdr = static_cast<uint32_t>(buf->size());
+ buf->resize(batch_size_hdr + pu::kMessageLengthFieldSize);
+
+ // Start the |string_cells|.
+ buf->push_back(MakeLenDelimTag(BatchProto::kStringCellsFieldNumber));
+ const uint32_t strings_hdr_off = static_cast<uint32_t>(buf->size());
+ buf->resize(strings_hdr_off + pu::kMessageLengthFieldSize);
+ const uint32_t strings_start_off = static_cast<uint32_t>(buf->size());
+
+ // This keeps track of the overall size of the batch. It is used to decide if
+ // we need to prematurely end the batch, even if the batch_split_threshold_ is
+ // not reached. This is to guard against the degenerate case of appending a
+ // lot of very large strings and ending up with an enormous batch.
+ auto approx_batch_size = static_cast<uint32_t>(buf->size()) - initial_size;
+
+ std::vector<uint8_t> cell_types(cells_per_batch_);
+
+ // Varints and doubles are written on stack-based storage and appended later.
+ protozero::PackedVarInt varints;
+ protozero::PackedFixedSizeInt<double> doubles;
+
+ // We write blobs on a temporary heap buffer and append it at the end. Blobs
+ // are extremely rare, trying to avoid copies is not worth the complexity.
+ std::vector<uint8_t> blobs;
+
+ uint32_t cell_idx = 0;
+ bool batch_full = false;
+
+ // Skip block if the query didn't return any result (e.g. CREATE TABLE).
+ while (num_cols_ > 0) {
+ // Next needs to be called before iterating on a row. col_ is initialized
+ // at MAX_INT in the constructor, so in the very first iteration this causes
+ // a Next() call.
+ if (col_ >= num_cols_) {
+ col_ = 0;
+ if (!iter_->Next())
+ break; // EOF or error.
+ }
+
+ auto value = iter_->Get(col_);
+ uint8_t cell_type = BatchProto::CELL_INVALID;
+ switch (value.type) {
+ case SqlValue::Type::kNull: {
+ cell_type = BatchProto::CELL_NULL;
+ break;
+ }
+ case SqlValue::Type::kLong: {
+ cell_type = BatchProto::CELL_VARINT;
+ varints.Append(value.long_value);
+ approx_batch_size += 4; // Just a guess, doesn't need to be accurate.
+ break;
+ }
+ case SqlValue::Type::kDouble: {
+ cell_type = BatchProto::CELL_FLOAT64;
+ approx_batch_size += sizeof(double);
+ doubles.Append(value.double_value);
+ break;
+ }
+ case SqlValue::Type::kString: {
+ // Append the string to the one |string_cells| proto field, just use
+ // \0 to separate each string. We are deliberately NOT emitting one
+ // proto repeated field for each string. Doing so significantly slows
+ // down parsing on the JS side (go/postmessage-benchmark).
+ cell_type = BatchProto::CELL_STRING;
+ uint32_t len_with_nul =
+ static_cast<uint32_t>(strlen(value.string_value)) + 1;
+ const char* str_begin = value.string_value;
+ buf->insert(buf->end(), str_begin, str_begin + len_with_nul);
+ approx_batch_size += len_with_nul;
+ break;
+ }
+ case SqlValue::Type::kBytes: {
+ // Each blob is stored as its own repeated proto field, unlike strings.
+ // Blobs don't incur in text-decoding overhead (and are also rare).
+ cell_type = BatchProto::CELL_BLOB;
+ auto* src = static_cast<const uint8_t*>(value.bytes_value);
+ uint32_t len = static_cast<uint32_t>(value.bytes_count);
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(BatchProto::kBlobCellsFieldNumber);
+ preamble_end = pu::WriteVarInt(len, preamble_end);
+ blobs.insert(blobs.end(), preamble, preamble_end);
+ blobs.insert(blobs.end(), src, src + len);
+ approx_batch_size += len + 4; // 4 is a guess on the preamble size.
+ break;
+ }
+ }
+
+ PERFETTO_DCHECK(cell_type != BatchProto::CELL_INVALID);
+ cell_types[cell_idx] = cell_type;
+
+ // The wrapping + iter_->Next() is done in the beginning to deal with
+ // the very first Next() call in one place.
+ ++cell_idx;
+ ++col_;
+
+ if (cell_idx >= cells_per_batch_ ||
+ approx_batch_size > batch_split_threshold_) {
+ batch_full = true;
+ break;
+ }
+
+ } // for (cell)
+
+ // Backfill the string size.
+ auto strings_size = static_cast<uint32_t>(buf->size() - strings_start_off);
+ pu::WriteRedundantVarInt(strings_size, buf->data() + strings_hdr_off);
+
+ // Write the cells headers (1 byte per cell).
+ {
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(BatchProto::kCellsFieldNumber);
+ preamble_end = pu::WriteVarInt(cell_idx, preamble_end);
+ buf->insert(buf->end(), preamble, preamble_end);
+ buf->insert(buf->end(), cell_types.data(), cell_types.data() + cell_idx);
+ }
+
+ // Append the |varint_cells|, copying over the packed varint buffer.
+ const uint32_t varints_size = static_cast<uint32_t>(varints.size());
+ if (varints_size > 0) {
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(BatchProto::kVarintCellsFieldNumber);
+ preamble_end = pu::WriteVarInt(varints_size, preamble_end);
+ buf->insert(buf->end(), preamble, preamble_end);
+ buf->insert(buf->end(), varints.data(), varints.data() + varints_size);
+ }
+
+ // Append the |float64_cells|, copying over the packed fixed64 buffer. This is
+ // appended at a 64-bit aligned offset, so that JS can access these by overlay
+ // a TypedArray, without extra copies.
+ const uint32_t doubles_size = static_cast<uint32_t>(doubles.size());
+ if (doubles_size > 0) {
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(BatchProto::kFloat64CellsFieldNumber);
+ preamble_end = pu::WriteVarInt(doubles_size, preamble_end);
+ uint32_t preamble_size = static_cast<uint32_t>(preamble_end - &preamble[0]);
+
+ // The byte after the preamble must start at a 64bit-aligned offset.
+ // The padding needs to be > 1 Byte because of proto encoding.
+ const uint32_t off = static_cast<uint32_t>(buf->size() + preamble_size);
+ const uint32_t aligned_off = (off + 7) & ~7u;
+ uint32_t padding = aligned_off - off;
+ if (padding == 1)
+ padding = 9;
+
+ if (padding > 0) {
+ buf->push_back(pu::MakeTagVarInt(kPaddingFieldId));
+ for (uint32_t i = 0; i < padding - 2; i++)
+ buf->push_back(0x80);
+ buf->push_back(0);
+ }
+
+ buf->insert(buf->end(), preamble, preamble_end);
+ PERFETTO_CHECK(buf->size() % 8 == 0);
+ buf->insert(buf->end(), doubles.data(), doubles.data() + doubles_size);
+ } // if (doubles_size > 0)
+
+ // Append the blobs.
+ buf->insert(buf->end(), blobs.begin(), blobs.end());
+
+ // If this is the last batch, write the EOF field.
+ if (!batch_full) {
+ eof_reached_ = true;
+ auto kEofTag = pu::MakeTagVarInt(BatchProto::kIsLastBatchFieldNumber);
+ buf->push_back(static_cast<uint8_t>(kEofTag));
+ buf->push_back(1);
+ }
+
+ // Finally backfill the size of the whole |batch| sub-message.
+ const uint32_t batch_size = static_cast<uint32_t>(
+ buf->size() - batch_size_hdr - pu::kMessageLengthFieldSize);
+ pu::WriteRedundantVarInt(batch_size, buf->data() + batch_size_hdr);
+}
+
+void QueryResultSerializer::MaybeSerializeError(std::vector<uint8_t>* buf) {
+ if (iter_->Status().ok())
+ return;
+ std::string err = iter_->Status().message();
+ // Make sure the |error| field is always non-zero if the query failed, so
+ // the client can tell some error happened.
+ if (err.empty())
+ err = "Unknown error";
+
+ // Write the error and return.
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(ResultProto::kErrorFieldNumber);
+ preamble_end = pu::WriteVarInt(err.size(), preamble_end);
+ buf->insert(buf->end(), preamble, preamble_end);
+ buf->insert(buf->end(), err.begin(), err.end());
+}
+
+void QueryResultSerializer::SerializeColumnNames(std::vector<uint8_t>* buf) {
+ PERFETTO_DCHECK(!did_write_column_names_);
+ for (uint32_t c = 0; c < num_cols_; c++) {
+ std::string col_name = iter_->GetColumnName(c);
+ uint8_t preamble[16];
+ uint8_t* preamble_end = &preamble[0];
+ *(preamble_end++) = MakeLenDelimTag(ResultProto::kColumnNamesFieldNumber);
+ preamble_end = pu::WriteVarInt(col_name.size(), preamble_end);
+ buf->insert(buf->end(), preamble, preamble_end);
+ buf->insert(buf->end(), col_name.begin(), col_name.end());
+ }
+}
+
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/rpc/query_result_serializer.h b/src/trace_processor/rpc/query_result_serializer.h
new file mode 100644
index 0000000..835f602
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
+#define SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
+
+#include <memory>
+#include <vector>
+
+#include <limits.h>
+#include <stddef.h>
+#include <stdint.h>
+
+namespace perfetto {
+namespace trace_processor {
+
+class Iterator;
+class IteratorImpl;
+
+// This class serializes a TraceProcessor query result (i.e. an Iterator)
+// into batches of QueryResult (trace_processor.proto). This class
+// returns results in batches, allowing to deal with O(M) results without
+// full memory buffering. It works as follows:
+// - The iterator is passed in the constructor.
+// - The client is expected to call Serialize(out_buf) until EOF is reached.
+// - For each Serialize() call, this class will serialize a batch of cells,
+// stopping when either when a number of cells (|cells_per_batch_|) is reached
+// or when the batch size exceeds (batch_split_threshold_).
+// The intended use case is streaaming these batches onto through a
+// chunked-encoded HTTP response, or through a repetition of Wasm calls.
+class QueryResultSerializer {
+ public:
+ explicit QueryResultSerializer(Iterator);
+ ~QueryResultSerializer();
+
+ // No copy or move.
+ QueryResultSerializer(const QueryResultSerializer&) = delete;
+ QueryResultSerializer& operator=(const QueryResultSerializer&) = delete;
+
+ // Appends the data to the passed vector (note: does NOT clear() the vector
+ // before starting). It returns true if more chunks are available (i.e.
+ // it returns NOT(|eof_reached_||)). The caller is supposed to keep calling
+ // this function until it returns false.
+ bool Serialize(std::vector<uint8_t>*);
+
+ void set_batch_size_for_testing(uint32_t cells_per_batch, uint32_t thres) {
+ cells_per_batch_ = cells_per_batch;
+ batch_split_threshold_ = thres;
+ }
+
+ private:
+ void SerializeColumnNames(std::vector<uint8_t>*);
+ void SerializeBatch(std::vector<uint8_t>*);
+ void MaybeSerializeError(std::vector<uint8_t>*);
+
+ std::unique_ptr<IteratorImpl> iter_;
+ const uint32_t num_cols_;
+ bool did_write_column_names_ = false;
+ bool eof_reached_ = false;
+ uint32_t col_ = UINT32_MAX;
+
+ // Overridable for testing only.
+ uint32_t cells_per_batch_ = 2048;
+ uint32_t batch_split_threshold_ = 1024 * 32;
+};
+
+} // namespace trace_processor
+} // namespace perfetto
+
+#endif // SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
diff --git a/src/trace_processor/rpc/query_result_serializer_benchmark.cc b/src/trace_processor/rpc/query_result_serializer_benchmark.cc
new file mode 100644
index 0000000..339d6ea
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer_benchmark.cc
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <benchmark/benchmark.h>
+
+#include "perfetto/trace_processor/basic_types.h"
+#include "perfetto/trace_processor/trace_processor.h"
+
+using perfetto::trace_processor::Config;
+using perfetto::trace_processor::QueryResultSerializer;
+using perfetto::trace_processor::TraceProcessor;
+using VectorType = std::vector<uint8_t>;
+
+namespace {
+
+bool IsBenchmarkFunctionalOnly() {
+ return getenv("BENCHMARK_FUNCTIONAL_TEST_ONLY") != nullptr;
+}
+
+void BenchmarkArgs(benchmark::internal::Benchmark* b) {
+ if (IsBenchmarkFunctionalOnly()) {
+ b->Ranges({{1024, 1024}, {4096, 4096}});
+ } else {
+ b->RangeMultiplier(8)->Ranges({{128, 8192}, {4096, 1024 * 512}});
+ }
+}
+
+void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
+ auto iter = tp->ExecuteQuery(query);
+ iter.Next();
+ PERFETTO_CHECK(iter.Status().ok());
+}
+
+} // namespace
+
+static void BM_QueryResultSerializer_Mixed(benchmark::State& state) {
+ auto tp = TraceProcessor::CreateInstance(Config());
+ RunQueryChecked(tp.get(), "create virtual table win using window;");
+ RunQueryChecked(tp.get(),
+ "update win set window_start=0, window_dur=50000, quantum=1 "
+ "where rowid = 0");
+ VectorType buf;
+ for (auto _ : state) {
+ auto iter = tp->ExecuteQuery(
+ "select dur || dur as x, ts, dur * 1.0 as dur, quantum_ts from win");
+ QueryResultSerializer serializer(std::move(iter));
+ serializer.set_batch_size_for_testing(
+ static_cast<uint32_t>(state.range(0)),
+ static_cast<uint32_t>(state.range(1)));
+ while (serializer.Serialize(&buf)) {
+ }
+ benchmark::DoNotOptimize(buf.data());
+ buf.clear();
+ }
+ benchmark::ClobberMemory();
+}
+
+static void BM_QueryResultSerializer_Strings(benchmark::State& state) {
+ auto tp = TraceProcessor::CreateInstance(Config());
+ RunQueryChecked(tp.get(), "create virtual table win using window;");
+ RunQueryChecked(tp.get(),
+ "update win set window_start=0, window_dur=100000, quantum=1 "
+ "where rowid = 0");
+ VectorType buf;
+ for (auto _ : state) {
+ auto iter = tp->ExecuteQuery(
+ "select ts || '-' || ts , (dur * 1.0) || dur from win");
+ QueryResultSerializer serializer(std::move(iter));
+ serializer.set_batch_size_for_testing(
+ static_cast<uint32_t>(state.range(0)),
+ static_cast<uint32_t>(state.range(1)));
+ while (serializer.Serialize(&buf)) {
+ }
+ benchmark::DoNotOptimize(buf.data());
+ buf.clear();
+ }
+ benchmark::ClobberMemory();
+}
+
+BENCHMARK(BM_QueryResultSerializer_Mixed)->Apply(BenchmarkArgs);
+BENCHMARK(BM_QueryResultSerializer_Strings)->Apply(BenchmarkArgs);
diff --git a/src/trace_processor/rpc/query_result_serializer_unittest.cc b/src/trace_processor/rpc/query_result_serializer_unittest.cc
new file mode 100644
index 0000000..aca25ec
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer_unittest.cc
@@ -0,0 +1,435 @@
+
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <deque>
+#include <ostream>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/trace_processor/basic_types.h"
+#include "perfetto/trace_processor/trace_processor.h"
+#include "test/gtest_and_gmock.h"
+
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// For ASSERT_THAT(ElementsAre(...))
+inline bool operator==(const SqlValue& a, const SqlValue& b) {
+ if (a.type != b.type)
+ return false;
+ if (a.type == SqlValue::kString)
+ return strcmp(a.string_value, b.string_value) == 0;
+ if (a.type == SqlValue::kBytes) {
+ if (a.bytes_count != b.bytes_count)
+ return false;
+ return memcmp(a.bytes_value, b.bytes_value, a.bytes_count) == 0;
+ }
+ return a.long_value == b.long_value;
+}
+
+inline std::ostream& operator<<(std::ostream& stream, const SqlValue& v) {
+ stream << "SqlValue{";
+ switch (v.type) {
+ case SqlValue::kString:
+ return stream << "\"" << v.string_value << "\"}";
+ case SqlValue::kBytes:
+ return stream << "Bytes[" << v.bytes_count << "]:"
+ << base::ToHex(reinterpret_cast<const char*>(v.bytes_value),
+ v.bytes_count)
+ << "}";
+ case SqlValue::kLong:
+ return stream << "Long " << v.long_value << "}";
+ case SqlValue::kDouble:
+ return stream << "Double " << v.double_value << "}";
+ case SqlValue::kNull:
+ return stream << "NULL}";
+ }
+ return stream;
+}
+
+namespace {
+
+using ::testing::ElementsAre;
+using BatchProto = protos::pbzero::QueryResult::CellsBatch;
+using ResultProto = protos::pbzero::QueryResult;
+
+void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
+ auto iter = tp->ExecuteQuery(query);
+ iter.Next();
+ ASSERT_TRUE(iter.Status().ok()) << iter.Status().message();
+}
+
+// Implements a minimal deserializer for QueryResultSerializer.
+class TestDeserializer {
+ public:
+ void SerializeAndDeserialize(QueryResultSerializer*);
+ void DeserializeBuffer(const uint8_t* start, size_t size);
+
+ std::vector<std::string> columns;
+ std::vector<SqlValue> cells;
+ std::string error;
+ bool eof_reached = false;
+
+ private:
+ std::vector<std::unique_ptr<char[]>> copied_buf_;
+};
+
+void TestDeserializer::SerializeAndDeserialize(
+ QueryResultSerializer* serializer) {
+ std::vector<uint8_t> buf;
+ error.clear();
+ for (eof_reached = false; !eof_reached;) {
+ serializer->Serialize(&buf);
+ DeserializeBuffer(buf.data(), buf.size());
+ buf.clear();
+ }
+}
+
+void TestDeserializer::DeserializeBuffer(const uint8_t* start, size_t size) {
+ ResultProto::Decoder result(start, size);
+ error += result.error().ToStdString();
+ for (auto it = result.column_names(); it; ++it)
+ columns.push_back(it->as_std_string());
+
+ for (auto batch_it = result.batch(); batch_it; ++batch_it) {
+ ASSERT_FALSE(eof_reached);
+ auto batch_bytes = batch_it->as_bytes();
+
+ ResultProto::CellsBatch::Decoder batch(batch_bytes.data, batch_bytes.size);
+ eof_reached = batch.is_last_batch();
+ std::deque<int64_t> varints;
+ std::deque<double> doubles;
+ std::deque<std::string> blobs;
+
+ bool parse_error = false;
+ for (auto it = batch.varint_cells(&parse_error); it; ++it)
+ varints.emplace_back(*it);
+
+ for (auto it = batch.float64_cells(&parse_error); it; ++it)
+ doubles.emplace_back(*it);
+
+ for (auto it = batch.blob_cells(); it; ++it)
+ blobs.emplace_back((*it).ToStdString());
+
+ std::string merged_strings = batch.string_cells().ToStdString();
+ std::deque<std::string> strings;
+ for (size_t pos = 0; pos < merged_strings.size();) {
+ // Will return npos for the last string, but it's fine
+ size_t next_sep = merged_strings.find('\0', pos);
+ strings.emplace_back(merged_strings.substr(pos, next_sep - pos));
+ pos = next_sep == std::string::npos ? next_sep : next_sep + 1;
+ }
+
+ for (auto it = batch.cells(&parse_error); it; ++it) {
+ uint8_t cell_type = static_cast<uint8_t>(*it);
+ switch (cell_type) {
+ case BatchProto::CELL_INVALID:
+ break;
+ case BatchProto::CELL_NULL:
+ cells.emplace_back(SqlValue());
+ break;
+ case BatchProto::CELL_VARINT:
+ ASSERT_GT(varints.size(), 0u);
+ cells.emplace_back(SqlValue::Long(varints.front()));
+ varints.pop_front();
+ break;
+ case BatchProto::CELL_FLOAT64:
+ ASSERT_GT(doubles.size(), 0u);
+ cells.emplace_back(SqlValue::Double(doubles.front()));
+ doubles.pop_front();
+ break;
+ case BatchProto::CELL_STRING: {
+ ASSERT_GT(strings.size(), 0u);
+ const std::string& str = strings.front();
+ copied_buf_.emplace_back(new char[str.size() + 1]);
+ char* new_buf = copied_buf_.back().get();
+ memcpy(new_buf, str.c_str(), str.size() + 1);
+ cells.emplace_back(SqlValue::String(new_buf));
+ strings.pop_front();
+ break;
+ }
+ case BatchProto::CELL_BLOB: {
+ ASSERT_GT(blobs.size(), 0u);
+ auto bytes = blobs.front();
+ copied_buf_.emplace_back(new char[bytes.size()]);
+ memcpy(copied_buf_.back().get(), bytes.data(), bytes.size());
+ cells.emplace_back(
+ SqlValue::Bytes(copied_buf_.back().get(), bytes.size()));
+ blobs.pop_front();
+ break;
+ }
+ default:
+ FAIL() << "Unknown cell type " << cell_type;
+ }
+
+ EXPECT_FALSE(parse_error);
+ }
+ }
+}
+
+TEST(QueryResultSerializerTest, ShortBatch) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+ auto iter = tp->ExecuteQuery(
+ "select 1 as i8, 128 as i16, 100000 as i32, 42001001001 as i64, 1e9 as "
+ "f64, 'a_string' as str, cast('a_blob' as blob) as blb");
+ QueryResultSerializer ser(std::move(iter));
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+
+ EXPECT_THAT(deser.columns,
+ ElementsAre("i8", "i16", "i32", "i64", "f64", "str", "blb"));
+ EXPECT_THAT(deser.cells,
+ ElementsAre(SqlValue::Long(1), SqlValue::Long(128),
+ SqlValue::Long(100000), SqlValue::Long(42001001001),
+ SqlValue::Double(1e9), SqlValue::String("a_string"),
+ SqlValue::Bytes("a_blob", 6)));
+}
+
+TEST(QueryResultSerializerTest, LongBatch) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+ RunQueryChecked(tp.get(), "create virtual table win using window;");
+ RunQueryChecked(tp.get(),
+ "update win set window_start=0, window_dur=8192, quantum=1 "
+ "where rowid = 0");
+
+ auto iter = tp->ExecuteQuery(
+ "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+ QueryResultSerializer ser(std::move(iter));
+
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+
+ ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+ ASSERT_EQ(deser.cells.size(), 4 * 8192u);
+ for (uint32_t row = 0; row < 1024; row++) {
+ uint32_t cell = row * 4;
+ ASSERT_EQ(deser.cells[cell].type, SqlValue::kString);
+ ASSERT_STREQ(deser.cells[cell].string_value, "x");
+
+ ASSERT_EQ(deser.cells[cell + 1].type, SqlValue::kLong);
+ ASSERT_EQ(deser.cells[cell + 1].long_value, row);
+
+ ASSERT_EQ(deser.cells[cell + 2].type, SqlValue::kDouble);
+ ASSERT_EQ(deser.cells[cell + 2].double_value, 1.0);
+
+ ASSERT_EQ(deser.cells[cell + 3].type, SqlValue::kLong);
+ ASSERT_EQ(deser.cells[cell + 3].long_value, row);
+ }
+}
+
+TEST(QueryResultSerializerTest, BatchSaturatingBinaryPayload) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+ RunQueryChecked(tp.get(), "create virtual table win using window;");
+ RunQueryChecked(tp.get(),
+ "update win set window_start=0, window_dur=1024, quantum=1 "
+ "where rowid = 0");
+ auto iter = tp->ExecuteQuery(
+ "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+ QueryResultSerializer ser(std::move(iter));
+ ser.set_batch_size_for_testing(1024, 32);
+
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+
+ ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+ ASSERT_EQ(deser.cells.size(), 1024 * 4u);
+}
+
+TEST(QueryResultSerializerTest, BatchSaturatingNumCells) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+ RunQueryChecked(tp.get(), "create virtual table win using window;");
+ RunQueryChecked(tp.get(),
+ "update win set window_start=0, window_dur=4, quantum=1 "
+ "where rowid = 0");
+ auto iter = tp->ExecuteQuery(
+ "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+ QueryResultSerializer ser(std::move(iter));
+ ser.set_batch_size_for_testing(16, 4096);
+
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+
+ ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+ ASSERT_EQ(deser.cells.size(), 16u);
+}
+
+TEST(QueryResultSerializerTest, LargeStringAndBlobs) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+ RunQueryChecked(tp.get(), "create table tab (colz);");
+
+ std::minstd_rand0 rnd_engine(0);
+ std::vector<SqlValue> expected;
+ std::string sql_values;
+ std::deque<std::string> string_buf; // Needs stable pointers
+ for (size_t n = 0; n < 32; n++) {
+ std::string very_long_str;
+ size_t len = (rnd_engine() % 4) * 32 * 1024;
+ very_long_str.resize(len);
+ for (size_t i = 0; i < very_long_str.size(); i++)
+ very_long_str[i] = 'A' + ((n * 11 + i) % 25);
+
+ if (n % 4 == 0) {
+ sql_values += "(NULL),";
+ expected.emplace_back(SqlValue()); // NULL.
+ } else if (n % 4 == 1) {
+ // Blob
+ sql_values += "(X'" + base::ToHex(very_long_str) + "'),";
+ string_buf.emplace_back(std::move(very_long_str));
+ expected.emplace_back(
+ SqlValue::Bytes(string_buf.back().data(), string_buf.back().size()));
+ } else {
+ sql_values += "('" + very_long_str + "'),";
+ string_buf.emplace_back(std::move(very_long_str));
+ expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
+ }
+ }
+ sql_values.resize(sql_values.size() - 1); // Remove trailing comma.
+ RunQueryChecked(tp.get(), "insert into tab (colz) values " + sql_values);
+
+ auto iter = tp->ExecuteQuery("select colz from tab");
+ QueryResultSerializer ser(std::move(iter));
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+ ASSERT_EQ(deser.cells.size(), expected.size());
+ for (size_t i = 0; i < expected.size(); i++) {
+ EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
+ }
+}
+
+TEST(QueryResultSerializerTest, RandomSizes) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+ static constexpr uint32_t kNumCells = 3 * 1000;
+
+ RunQueryChecked(tp.get(), "create table tab (a, b, c);");
+ std::vector<SqlValue> expected;
+ expected.reserve(kNumCells);
+ std::deque<std::string> string_buf; // Needs stable pointers
+ std::minstd_rand0 rnd_engine(0);
+ std::string insert_values;
+
+ for (uint32_t i = 0; i < kNumCells; i++) {
+ const uint32_t col = i % 3;
+ if (col == 0)
+ insert_values += "(";
+ int type = rnd_engine() % 5;
+ if (type == 0) {
+ expected.emplace_back(SqlValue()); // NULL
+ insert_values += "NULL";
+ } else if (type == 1) {
+ expected.emplace_back(SqlValue::Long(static_cast<long>(rnd_engine())));
+ insert_values += std::to_string(expected.back().long_value);
+ } else if (type == 2) {
+ expected.emplace_back(SqlValue::Double(rnd_engine() * 1.0));
+ insert_values += std::to_string(expected.back().double_value);
+ } else if (type == 3 || type == 4) {
+ size_t len = (rnd_engine() % 5) * 32;
+ std::string rndstr;
+ rndstr.resize(len);
+ for (size_t n = 0; n < len; n++)
+ rndstr[n] = static_cast<char>(rnd_engine() % 256);
+ auto rndstr_hex = base::ToHex(rndstr);
+ if (type == 3) {
+ insert_values += "\"" + rndstr_hex + "\"";
+ string_buf.emplace_back(std::move(rndstr_hex));
+ expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
+
+ } else {
+ insert_values += "X'" + rndstr_hex + "'";
+ string_buf.emplace_back(std::move(rndstr));
+ expected.emplace_back(SqlValue::Bytes(string_buf.back().data(),
+ string_buf.back().size()));
+ }
+ }
+
+ if (col < 2) {
+ insert_values += ",";
+ } else {
+ insert_values += "),";
+ if (insert_values.size() > 1024 * 1024 || i == kNumCells - 1) {
+ insert_values[insert_values.size() - 1] = ';';
+ auto query = "insert into tab (a,b,c) values " + insert_values;
+ insert_values = "";
+ RunQueryChecked(tp.get(), query);
+ }
+ }
+ }
+
+ // Serialize and de-serialize with different batch and payload sizes.
+ for (int rep = 0; rep < 10; rep++) {
+ auto iter = tp->ExecuteQuery("select * from tab");
+ QueryResultSerializer ser(std::move(iter));
+ uint32_t cells_per_batch = 1 << (rnd_engine() % 8 + 2);
+ uint32_t binary_payload_size = 1 << (rnd_engine() % 8 + 8);
+ ser.set_batch_size_for_testing(cells_per_batch, binary_payload_size);
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+ ASSERT_EQ(deser.cells.size(), expected.size());
+ for (size_t i = 0; i < expected.size(); i++) {
+ EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
+ }
+ }
+}
+
+TEST(QueryResultSerializerTest, ErrorBeforeStartingQuery) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+ auto iter = tp->ExecuteQuery("insert into incomplete_input");
+ QueryResultSerializer ser(std::move(iter));
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+ EXPECT_EQ(deser.cells.size(), 0u);
+ EXPECT_EQ(deser.error, "incomplete input");
+ EXPECT_TRUE(deser.eof_reached);
+}
+
+TEST(QueryResultSerializerTest, ErrorAfterSomeResults) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+ RunQueryChecked(tp.get(), "create table tab (x)");
+ RunQueryChecked(tp.get(), "insert into tab (x) values (0), (1), ('error')");
+ auto iter = tp->ExecuteQuery("select str_split('a;b', ';', x) as s from tab");
+ QueryResultSerializer ser(std::move(iter));
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+ EXPECT_NE(deser.error, "");
+ EXPECT_THAT(deser.cells,
+ ElementsAre(SqlValue::String("a"), SqlValue::String("b")));
+ EXPECT_TRUE(deser.eof_reached);
+}
+
+TEST(QueryResultSerializerTest, NoResultQuery) {
+ auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+ auto iter = tp->ExecuteQuery("create table tab (x)");
+ QueryResultSerializer ser(std::move(iter));
+ TestDeserializer deser;
+ deser.SerializeAndDeserialize(&ser);
+ EXPECT_EQ(deser.error, "");
+ EXPECT_EQ(deser.cells.size(), 0u);
+ EXPECT_TRUE(deser.eof_reached);
+}
+
+} // namespace
+} // namespace trace_processor
+} // namespace perfetto