TraceProcessor: Add binary RPC interface

Doc: https://docs.google.com/document/d/1Xno5BovxYweoSC6G3UjcsVJD65FLFTUjzUR0FXUmt0s/edit?usp=sharing
This CL doesn't wire it up yet to RPC or Wasm.
Next CLs will do.

Test: perfetto_unittests --gtest_filter=QueryResultSerializerTest.*
Change-Id: I08b7bb638fe4deabf41e14e30fc548aec9ab58e4
Bug: 159142289
diff --git a/Android.bp b/Android.bp
index ce5d302..dd7c365 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3131,6 +3131,158 @@
   ],
 }
 
+// GN: //protos/perfetto/metrics/android:zero
+genrule {
+  name: "perfetto_protos_perfetto_metrics_android_zero_gen",
+  srcs: [
+    "protos/perfetto/metrics/android/batt_metric.proto",
+    "protos/perfetto/metrics/android/cpu_metric.proto",
+    "protos/perfetto/metrics/android/display_metrics.proto",
+    "protos/perfetto/metrics/android/heap_profile_callsites.proto",
+    "protos/perfetto/metrics/android/hwui_metric.proto",
+    "protos/perfetto/metrics/android/ion_metric.proto",
+    "protos/perfetto/metrics/android/java_heap_histogram.proto",
+    "protos/perfetto/metrics/android/java_heap_stats.proto",
+    "protos/perfetto/metrics/android/lmk_metric.proto",
+    "protos/perfetto/metrics/android/lmk_reason_metric.proto",
+    "protos/perfetto/metrics/android/mem_metric.proto",
+    "protos/perfetto/metrics/android/mem_unagg_metric.proto",
+    "protos/perfetto/metrics/android/package_list.proto",
+    "protos/perfetto/metrics/android/powrails_metric.proto",
+    "protos/perfetto/metrics/android/process_metadata.proto",
+    "protos/perfetto/metrics/android/startup_metric.proto",
+    "protos/perfetto/metrics/android/surfaceflinger.proto",
+    "protos/perfetto/metrics/android/task_names.proto",
+    "protos/perfetto/metrics/android/thread_time_in_state_metric.proto",
+    "protos/perfetto/metrics/android/unmapped_java_symbols.proto",
+    "protos/perfetto/metrics/android/unsymbolized_frames.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/metrics/android/batt_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/cpu_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/display_metrics.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/heap_profile_callsites.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/hwui_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/ion_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/java_heap_histogram.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/java_heap_stats.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/lmk_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/lmk_reason_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/mem_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/mem_unagg_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/package_list.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/powrails_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/process_metadata.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/startup_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/surfaceflinger.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/task_names.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/thread_time_in_state_metric.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/unmapped_java_symbols.pbzero.cc",
+    "external/perfetto/protos/perfetto/metrics/android/unsymbolized_frames.pbzero.cc",
+  ],
+}
+
+// GN: //protos/perfetto/metrics/android:zero
+genrule {
+  name: "perfetto_protos_perfetto_metrics_android_zero_gen_headers",
+  srcs: [
+    "protos/perfetto/metrics/android/batt_metric.proto",
+    "protos/perfetto/metrics/android/cpu_metric.proto",
+    "protos/perfetto/metrics/android/display_metrics.proto",
+    "protos/perfetto/metrics/android/heap_profile_callsites.proto",
+    "protos/perfetto/metrics/android/hwui_metric.proto",
+    "protos/perfetto/metrics/android/ion_metric.proto",
+    "protos/perfetto/metrics/android/java_heap_histogram.proto",
+    "protos/perfetto/metrics/android/java_heap_stats.proto",
+    "protos/perfetto/metrics/android/lmk_metric.proto",
+    "protos/perfetto/metrics/android/lmk_reason_metric.proto",
+    "protos/perfetto/metrics/android/mem_metric.proto",
+    "protos/perfetto/metrics/android/mem_unagg_metric.proto",
+    "protos/perfetto/metrics/android/package_list.proto",
+    "protos/perfetto/metrics/android/powrails_metric.proto",
+    "protos/perfetto/metrics/android/process_metadata.proto",
+    "protos/perfetto/metrics/android/startup_metric.proto",
+    "protos/perfetto/metrics/android/surfaceflinger.proto",
+    "protos/perfetto/metrics/android/task_names.proto",
+    "protos/perfetto/metrics/android/thread_time_in_state_metric.proto",
+    "protos/perfetto/metrics/android/unmapped_java_symbols.proto",
+    "protos/perfetto/metrics/android/unsymbolized_frames.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/metrics/android/batt_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/cpu_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/display_metrics.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/heap_profile_callsites.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/hwui_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/ion_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/java_heap_histogram.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/java_heap_stats.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/lmk_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/lmk_reason_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/mem_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/mem_unagg_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/package_list.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/powrails_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/process_metadata.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/startup_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/surfaceflinger.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/task_names.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/thread_time_in_state_metric.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/unmapped_java_symbols.pbzero.h",
+    "external/perfetto/protos/perfetto/metrics/android/unsymbolized_frames.pbzero.h",
+  ],
+  export_include_dirs: [
+    ".",
+    "protos",
+  ],
+}
+
+// GN: //protos/perfetto/metrics:zero
+genrule {
+  name: "perfetto_protos_perfetto_metrics_zero_gen",
+  srcs: [
+    "protos/perfetto/metrics/metrics.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/metrics/metrics.pbzero.cc",
+  ],
+}
+
+// GN: //protos/perfetto/metrics:zero
+genrule {
+  name: "perfetto_protos_perfetto_metrics_zero_gen_headers",
+  srcs: [
+    "protos/perfetto/metrics/metrics.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/metrics/metrics.pbzero.h",
+  ],
+  export_include_dirs: [
+    ".",
+    "protos",
+  ],
+}
+
 // GN: //protos/perfetto/trace/android:cpp
 genrule {
   name: "perfetto_protos_perfetto_trace_android_cpp_gen",
@@ -4863,6 +5015,42 @@
   ],
 }
 
+// GN: //protos/perfetto/trace_processor:zero
+genrule {
+  name: "perfetto_protos_perfetto_trace_processor_zero_gen",
+  srcs: [
+    "protos/perfetto/trace_processor/trace_processor.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/trace_processor/trace_processor.pbzero.cc",
+  ],
+}
+
+// GN: //protos/perfetto/trace_processor:zero
+genrule {
+  name: "perfetto_protos_perfetto_trace_processor_zero_gen_headers",
+  srcs: [
+    "protos/perfetto/trace_processor/trace_processor.proto",
+  ],
+  tools: [
+    "aprotoc",
+    "protozero_plugin",
+  ],
+  cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+  out: [
+    "external/perfetto/protos/perfetto/trace_processor/trace_processor.pbzero.h",
+  ],
+  export_include_dirs: [
+    ".",
+    "protos",
+  ],
+}
+
 // GN: //protos/perfetto/trace/profiling:cpp
 genrule {
   name: "perfetto_protos_perfetto_trace_profiling_cpp_gen",
@@ -6556,6 +6744,23 @@
   ],
 }
 
+// GN: //src/trace_processor/rpc:rpc
+filegroup {
+  name: "perfetto_src_trace_processor_rpc_rpc",
+  srcs: [
+    "src/trace_processor/rpc/query_result_serializer.cc",
+    "src/trace_processor/rpc/rpc.cc",
+  ],
+}
+
+// GN: //src/trace_processor/rpc:unittests
+filegroup {
+  name: "perfetto_src_trace_processor_rpc_unittests",
+  srcs: [
+    "src/trace_processor/rpc/query_result_serializer_unittest.cc",
+  ],
+}
+
 // GN: //src/trace_processor/sqlite:sqlite
 filegroup {
   name: "perfetto_src_trace_processor_sqlite_sqlite",
@@ -7550,6 +7755,8 @@
     ":perfetto_protos_perfetto_ipc_cpp_gen",
     ":perfetto_protos_perfetto_ipc_ipc_gen",
     ":perfetto_protos_perfetto_ipc_wire_protocol_cpp_gen",
+    ":perfetto_protos_perfetto_metrics_android_zero_gen",
+    ":perfetto_protos_perfetto_metrics_zero_gen",
     ":perfetto_protos_perfetto_trace_android_cpp_gen",
     ":perfetto_protos_perfetto_trace_android_zero_gen",
     ":perfetto_protos_perfetto_trace_chrome_cpp_gen",
@@ -7571,6 +7778,7 @@
     ":perfetto_protos_perfetto_trace_power_cpp_gen",
     ":perfetto_protos_perfetto_trace_power_zero_gen",
     ":perfetto_protos_perfetto_trace_processor_metrics_impl_zero_gen",
+    ":perfetto_protos_perfetto_trace_processor_zero_gen",
     ":perfetto_protos_perfetto_trace_profiling_cpp_gen",
     ":perfetto_protos_perfetto_trace_profiling_zero_gen",
     ":perfetto_protos_perfetto_trace_ps_cpp_gen",
@@ -7637,6 +7845,8 @@
     ":perfetto_src_trace_processor_metatrace",
     ":perfetto_src_trace_processor_metrics_lib",
     ":perfetto_src_trace_processor_metrics_unittests",
+    ":perfetto_src_trace_processor_rpc_rpc",
+    ":perfetto_src_trace_processor_rpc_unittests",
     ":perfetto_src_trace_processor_sqlite_sqlite",
     ":perfetto_src_trace_processor_sqlite_unittests",
     ":perfetto_src_trace_processor_storage_full",
@@ -7741,6 +7951,8 @@
     "perfetto_protos_perfetto_ipc_cpp_gen_headers",
     "perfetto_protos_perfetto_ipc_ipc_gen_headers",
     "perfetto_protos_perfetto_ipc_wire_protocol_cpp_gen_headers",
+    "perfetto_protos_perfetto_metrics_android_zero_gen_headers",
+    "perfetto_protos_perfetto_metrics_zero_gen_headers",
     "perfetto_protos_perfetto_trace_android_cpp_gen_headers",
     "perfetto_protos_perfetto_trace_android_zero_gen_headers",
     "perfetto_protos_perfetto_trace_chrome_cpp_gen_headers",
@@ -7762,6 +7974,7 @@
     "perfetto_protos_perfetto_trace_power_cpp_gen_headers",
     "perfetto_protos_perfetto_trace_power_zero_gen_headers",
     "perfetto_protos_perfetto_trace_processor_metrics_impl_zero_gen_headers",
+    "perfetto_protos_perfetto_trace_processor_zero_gen_headers",
     "perfetto_protos_perfetto_trace_profiling_cpp_gen_headers",
     "perfetto_protos_perfetto_trace_profiling_zero_gen_headers",
     "perfetto_protos_perfetto_trace_ps_cpp_gen_headers",
diff --git a/BUILD b/BUILD
index fa4dc90..13ad87a 100644
--- a/BUILD
+++ b/BUILD
@@ -815,6 +815,8 @@
 filegroup(
     name = "src_trace_processor_rpc_rpc",
     srcs = [
+        "src/trace_processor/rpc/query_result_serializer.cc",
+        "src/trace_processor/rpc/query_result_serializer.h",
         "src/trace_processor/rpc/rpc.cc",
         "src/trace_processor/rpc/rpc.h",
     ],
diff --git a/gn/perfetto_benchmarks.gni b/gn/perfetto_benchmarks.gni
index 46e067c..ca2e36a 100644
--- a/gn/perfetto_benchmarks.gni
+++ b/gn/perfetto_benchmarks.gni
@@ -25,6 +25,7 @@
   "src/traced/probes/ftrace:benchmarks",
   "src/tracing/core:benchmarks",
   "src/tracing:benchmarks",
+  "src/trace_processor/rpc:benchmarks",
   "test:benchmark_main",
   "test:end_to_end_benchmarks",
 ]
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index cc41758..7cdfc1a 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -85,6 +85,14 @@
     return value;
   }
 
+  static SqlValue Bytes(const void* v, size_t size) {
+    SqlValue value;
+    value.bytes_value = v;
+    value.bytes_count = size;
+    value.type = Type::kBytes;
+    return value;
+  }
+
   double AsDouble() const {
     PERFETTO_CHECK(type == kDouble);
     return double_value;
diff --git a/include/perfetto/trace_processor/iterator.h b/include/perfetto/trace_processor/iterator.h
index 9578f8f..f61f5d7 100644
--- a/include/perfetto/trace_processor/iterator.h
+++ b/include/perfetto/trace_processor/iterator.h
@@ -65,6 +65,15 @@
   util::Status Status();
 
  private:
+  friend class QueryResultSerializer;
+
+  // This is to allow QueryResultSerializer, which is very perf sensitive, to
+  // access direct the impl_ and avoid one extra function call for each cell.
+  template <typename T = IteratorImpl>
+  std::unique_ptr<T> take_impl() {
+    return std::move(iterator_);
+  }
+
   // A PIMPL pattern is used to avoid leaking the dependencies on sqlite3.h and
   // other internal classes.
   std::unique_ptr<IteratorImpl> iterator_;
diff --git a/protos/perfetto/trace_processor/trace_processor.proto b/protos/perfetto/trace_processor/trace_processor.proto
index 66bbe63..6dc3fe1 100644
--- a/protos/perfetto/trace_processor/trace_processor.proto
+++ b/protos/perfetto/trace_processor/trace_processor.proto
@@ -44,6 +44,7 @@
 }
 
 // Output for the /raw_query endpoint.
+// DEPRECATED, use /query. See QueryResult below.
 message RawQueryResult {
   message ColumnDesc {
     optional string name = 1;
@@ -74,6 +75,60 @@
   optional uint64 execution_time_ns = 5;
 }
 
+// Output for the /query endpoint.
+// Returns a query result set, grouping cells into batches. Batching allows a
+// more efficient encoding of results, at the same time allowing to return
+// O(M) results in a pipelined fashion, without full-memory buffering.
+// Batches are split when either a large number of cells (~thousands) is reached
+// or the string/blob payload becomes too large (~hundreds of KB).
+// Data is batched in cells, scanning results by row -> column. e.g. if a query
+// returns 3 columns and 2 rows, the cells will be emitted in this order:
+// R0C0, R0C1, R0C2, R1C0, R1C1, R1C2.
+message QueryResult {
+  // This determines the number and names of columns.
+  repeated string column_names = 1;
+
+  // If non-emty the query returned an error. Note that some cells might still
+  // be present, if the error happened while iterating.
+  optional string error = 2;
+
+  // A batch contains an array of cell headers, stating the type of each cell.
+  // The payload of each cell is stored in the corresponding xxx_cells field
+  // below (unless the cell is NULL).
+  // So if |cells| contains: [VARINT, FLOAT64, VARINT, STRING], the results will
+  // be available as:
+  // [varint_cells[0], float64_cells[0], varint_cells[1], string_cells[0]].
+  message CellsBatch {
+    enum CellType {
+      CELL_INVALID = 0;
+      CELL_NULL = 1;
+      CELL_VARINT = 2;
+      CELL_FLOAT64 = 3;
+      CELL_STRING = 4;
+      CELL_BLOB = 5;
+    }
+    repeated CellType cells = 1 [packed = true];
+
+    repeated int64 varint_cells = 2 [packed = true];
+    repeated double float64_cells = 3 [packed = true];
+    repeated bytes blob_cells = 4;
+
+    // The string cells are concatenated in a single field. Each cell is
+    // NUL-terminated. This is because JS incurs into a non-negligible overhead
+    // when decoding strings and one decode + split('\0') is measurably faster
+    // than decoding N strings. See goto.google.com/postmessage-benchmark .
+    // \0-concatenated.
+    optional string string_cells = 5;
+
+    // If true this is the last batch for the query result.
+    optional bool is_last_batch = 6;
+
+    // Padding field. Used only to re-align and fill gaps in the binary format.
+    reserved 7;
+  }
+  repeated CellsBatch batch = 3;
+}
+
 // Input for the /status endpoint.
 message StatusArgs {}
 
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index cc061ae..02925da 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -299,7 +299,6 @@
       ":metatrace",
       ":storage_full",
       "../../gn:default_deps",
-      "../../gn:sqlite",
       "../../protos/perfetto/trace/ftrace:zero",
       "../base",
       "analysis",
@@ -311,7 +310,10 @@
       "tables",
       "types",
     ]
-    public_deps = [ "../../include/perfetto/trace_processor" ]
+    public_deps = [
+      "../../gn:sqlite",  # iterator_impl.h includes sqlite3.h.
+      "../../include/perfetto/trace_processor",
+    ]
     if (enable_perfetto_trace_processor_json) {
       deps += [ "../../gn:jsoncpp" ]
     }
@@ -381,6 +383,7 @@
     "db:unittests",
     "importers:common",
     "importers:unittests",
+    "rpc:unittests",
     "storage",
     "tables:unittests",
     "types",
diff --git a/src/trace_processor/rpc/BUILD.gn b/src/trace_processor/rpc/BUILD.gn
index 15690b2..c81552c 100644
--- a/src/trace_processor/rpc/BUILD.gn
+++ b/src/trace_processor/rpc/BUILD.gn
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import("../../../gn/perfetto.gni")
+import("../../../gn/test.gni")
 import("../../../gn/wasm.gni")
 
 # Prevent that this file is accidentally included in embedder builds.
@@ -22,10 +23,13 @@
 # interface) and by the :httpd module for the HTTP interface.
 source_set("rpc") {
   sources = [
+    "query_result_serializer.cc",
+    "query_result_serializer.h",
     "rpc.cc",
     "rpc.h",
   ]
   deps = [
+    "..:lib",
     "..:metatrace",
     "../../../gn:default_deps",
     "../../../include/perfetto/trace_processor",
@@ -35,6 +39,19 @@
   ]
 }
 
+perfetto_unittest_source_set("unittests") {
+  testonly = true
+  sources = [ "query_result_serializer_unittest.cc" ]
+  deps = [
+    ":rpc",
+    "..:lib",
+    "../../../gn:default_deps",
+    "../../../gn:gtest_and_gmock",
+    "../../../protos/perfetto/trace_processor:zero",
+    "../../base",
+  ]
+}
+
 if (enable_perfetto_trace_processor_httpd) {
   source_set("httpd") {
     sources = [
@@ -64,3 +81,18 @@
     ]
   }
 }
+
+if (enable_perfetto_benchmarks) {
+  source_set("benchmarks") {
+    testonly = true
+    deps = [
+      ":rpc",
+      "..:lib",
+      "../../../gn:benchmark",
+      "../../../gn:default_deps",
+      "../../../gn:sqlite",
+      "../../base",
+    ]
+    sources = [ "query_result_serializer_benchmark.cc" ]
+  }
+}
diff --git a/src/trace_processor/rpc/query_result_serializer.cc b/src/trace_processor/rpc/query_result_serializer.cc
new file mode 100644
index 0000000..be44297
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer.cc
@@ -0,0 +1,297 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <vector>
+
+#include "perfetto/protozero/packed_repeated_fields.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+#include "src/trace_processor/iterator_impl.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+
+namespace pu = ::protozero::proto_utils;
+using BatchProto = protos::pbzero::QueryResult::CellsBatch;
+using ResultProto = protos::pbzero::QueryResult;
+
+// The reserved field in trace_processor.proto.
+static constexpr uint32_t kPaddingFieldId = 7;
+
+uint8_t MakeLenDelimTag(uint32_t field_num) {
+  uint32_t tag = pu::MakeTagLengthDelimited(field_num);
+  PERFETTO_DCHECK(tag <= 127);  // Must fit in one byte.
+  return static_cast<uint8_t>(tag);
+}
+
+}  // namespace
+
+QueryResultSerializer::QueryResultSerializer(Iterator iter)
+    : iter_(iter.take_impl()), num_cols_(iter_->ColumnCount()) {}
+
+QueryResultSerializer::~QueryResultSerializer() = default;
+
+bool QueryResultSerializer::Serialize(std::vector<uint8_t>* buf) {
+  PERFETTO_CHECK(!eof_reached_);
+
+  // In non-production builds avoid the big reservation. This is to avoid hiding
+  // bugs that accidentally depend on pointer stability across resizes.
+#if !PERFETTO_DCHECK_IS_ON()
+  buf->reserve(buf->size() + batch_split_threshold_ + 4096);
+#endif
+
+  if (!did_write_column_names_) {
+    SerializeColumnNames(buf);
+    did_write_column_names_ = true;
+  }
+
+  // In case of an error we still want to go through SerializeBatch(). That will
+  // write an empty batch with the EOF marker. Errors can happen also in the
+  // middle of a query, not just before starting it.
+
+  SerializeBatch(buf);
+  MaybeSerializeError(buf);
+
+  return !eof_reached_;
+}
+
+void QueryResultSerializer::SerializeBatch(std::vector<uint8_t>* buf) {
+  // The buffer is filled in this way:
+  // - Append all the strings as we iterate through the results. The rationale
+  //   is that strings are typically the largest part of the result and we want
+  //   to avoid copying these.
+  // - While iterating, buffer all other types of cells. They will be appended
+  //   at the end of the batch, after the string payload is known.
+
+  // Note: this function uses uint32_t instead of size_t because Wasm doesn't
+  // have yet native 64-bit integers and this is perf-sensitive.
+  const uint32_t initial_size = static_cast<uint32_t>(buf->size());
+
+  buf->push_back(MakeLenDelimTag(ResultProto::kBatchFieldNumber));
+  const uint32_t batch_size_hdr = static_cast<uint32_t>(buf->size());
+  buf->resize(batch_size_hdr + pu::kMessageLengthFieldSize);
+
+  // Start the |string_cells|.
+  buf->push_back(MakeLenDelimTag(BatchProto::kStringCellsFieldNumber));
+  const uint32_t strings_hdr_off = static_cast<uint32_t>(buf->size());
+  buf->resize(strings_hdr_off + pu::kMessageLengthFieldSize);
+  const uint32_t strings_start_off = static_cast<uint32_t>(buf->size());
+
+  // This keeps track of the overall size of the batch. It is used to decide if
+  // we need to prematurely end the batch, even if the batch_split_threshold_ is
+  // not reached. This is to guard against the degenerate case of appending a
+  // lot of very large strings and ending up with an enormous batch.
+  auto approx_batch_size = static_cast<uint32_t>(buf->size()) - initial_size;
+
+  std::vector<uint8_t> cell_types(cells_per_batch_);
+
+  // Varints and doubles are written on stack-based storage and appended later.
+  protozero::PackedVarInt varints;
+  protozero::PackedFixedSizeInt<double> doubles;
+
+  // We write blobs on a temporary heap buffer and append it at the end. Blobs
+  // are extremely rare, trying to avoid copies is not worth the complexity.
+  std::vector<uint8_t> blobs;
+
+  uint32_t cell_idx = 0;
+  bool batch_full = false;
+
+  // Skip block if the query didn't return any result (e.g. CREATE TABLE).
+  while (num_cols_ > 0) {
+    // Next needs to be called before iterating on a row. col_ is initialized
+    // at MAX_INT in the constructor, so in the very first iteration this causes
+    // a Next() call.
+    if (col_ >= num_cols_) {
+      col_ = 0;
+      if (!iter_->Next())
+        break;  // EOF or error.
+    }
+
+    auto value = iter_->Get(col_);
+    uint8_t cell_type = BatchProto::CELL_INVALID;
+    switch (value.type) {
+      case SqlValue::Type::kNull: {
+        cell_type = BatchProto::CELL_NULL;
+        break;
+      }
+      case SqlValue::Type::kLong: {
+        cell_type = BatchProto::CELL_VARINT;
+        varints.Append(value.long_value);
+        approx_batch_size += 4;  // Just a guess, doesn't need to be accurate.
+        break;
+      }
+      case SqlValue::Type::kDouble: {
+        cell_type = BatchProto::CELL_FLOAT64;
+        approx_batch_size += sizeof(double);
+        doubles.Append(value.double_value);
+        break;
+      }
+      case SqlValue::Type::kString: {
+        // Append the string to the one |string_cells| proto field, just use
+        // \0 to separate each string. We are deliberately NOT emitting one
+        // proto repeated field for each string. Doing so significantly slows
+        // down parsing on the JS side (go/postmessage-benchmark).
+        cell_type = BatchProto::CELL_STRING;
+        uint32_t len_with_nul =
+            static_cast<uint32_t>(strlen(value.string_value)) + 1;
+        const char* str_begin = value.string_value;
+        buf->insert(buf->end(), str_begin, str_begin + len_with_nul);
+        approx_batch_size += len_with_nul;
+        break;
+      }
+      case SqlValue::Type::kBytes: {
+        // Each blob is stored as its own repeated proto field, unlike strings.
+        // Blobs don't incur in text-decoding overhead (and are also rare).
+        cell_type = BatchProto::CELL_BLOB;
+        auto* src = static_cast<const uint8_t*>(value.bytes_value);
+        uint32_t len = static_cast<uint32_t>(value.bytes_count);
+        uint8_t preamble[16];
+        uint8_t* preamble_end = &preamble[0];
+        *(preamble_end++) = MakeLenDelimTag(BatchProto::kBlobCellsFieldNumber);
+        preamble_end = pu::WriteVarInt(len, preamble_end);
+        blobs.insert(blobs.end(), preamble, preamble_end);
+        blobs.insert(blobs.end(), src, src + len);
+        approx_batch_size += len + 4;  // 4 is a guess on the preamble size.
+        break;
+      }
+    }
+
+    PERFETTO_DCHECK(cell_type != BatchProto::CELL_INVALID);
+    cell_types[cell_idx] = cell_type;
+
+    // The wrapping + iter_->Next() is done in the beginning to deal with
+    // the very first Next() call in one place.
+    ++cell_idx;
+    ++col_;
+
+    if (cell_idx >= cells_per_batch_ ||
+        approx_batch_size > batch_split_threshold_) {
+      batch_full = true;
+      break;
+    }
+
+  }  // for (cell)
+
+  // Backfill the string size.
+  auto strings_size = static_cast<uint32_t>(buf->size() - strings_start_off);
+  pu::WriteRedundantVarInt(strings_size, buf->data() + strings_hdr_off);
+
+  // Write the cells headers (1 byte per cell).
+  {
+    uint8_t preamble[16];
+    uint8_t* preamble_end = &preamble[0];
+    *(preamble_end++) = MakeLenDelimTag(BatchProto::kCellsFieldNumber);
+    preamble_end = pu::WriteVarInt(cell_idx, preamble_end);
+    buf->insert(buf->end(), preamble, preamble_end);
+    buf->insert(buf->end(), cell_types.data(), cell_types.data() + cell_idx);
+  }
+
+  // Append the |varint_cells|, copying over the packed varint buffer.
+  const uint32_t varints_size = static_cast<uint32_t>(varints.size());
+  if (varints_size > 0) {
+    uint8_t preamble[16];
+    uint8_t* preamble_end = &preamble[0];
+    *(preamble_end++) = MakeLenDelimTag(BatchProto::kVarintCellsFieldNumber);
+    preamble_end = pu::WriteVarInt(varints_size, preamble_end);
+    buf->insert(buf->end(), preamble, preamble_end);
+    buf->insert(buf->end(), varints.data(), varints.data() + varints_size);
+  }
+
+  // Append the |float64_cells|, copying over the packed fixed64 buffer. This is
+  // appended at a 64-bit aligned offset, so that JS can access these by overlay
+  // a TypedArray, without extra copies.
+  const uint32_t doubles_size = static_cast<uint32_t>(doubles.size());
+  if (doubles_size > 0) {
+    uint8_t preamble[16];
+    uint8_t* preamble_end = &preamble[0];
+    *(preamble_end++) = MakeLenDelimTag(BatchProto::kFloat64CellsFieldNumber);
+    preamble_end = pu::WriteVarInt(doubles_size, preamble_end);
+    uint32_t preamble_size = static_cast<uint32_t>(preamble_end - &preamble[0]);
+
+    // The byte after the preamble must start at a 64bit-aligned offset.
+    // The padding needs to be > 1 Byte because of proto encoding.
+    const uint32_t off = static_cast<uint32_t>(buf->size() + preamble_size);
+    const uint32_t aligned_off = (off + 7) & ~7u;
+    uint32_t padding = aligned_off - off;
+    if (padding == 1)
+      padding = 9;
+
+    if (padding > 0) {
+      buf->push_back(pu::MakeTagVarInt(kPaddingFieldId));
+      for (uint32_t i = 0; i < padding - 2; i++)
+        buf->push_back(0x80);
+      buf->push_back(0);
+    }
+
+    buf->insert(buf->end(), preamble, preamble_end);
+    PERFETTO_CHECK(buf->size() % 8 == 0);
+    buf->insert(buf->end(), doubles.data(), doubles.data() + doubles_size);
+  }  // if (doubles_size > 0)
+
+  // Append the blobs.
+  buf->insert(buf->end(), blobs.begin(), blobs.end());
+
+  // If this is the last batch, write the EOF field.
+  if (!batch_full) {
+    eof_reached_ = true;
+    auto kEofTag = pu::MakeTagVarInt(BatchProto::kIsLastBatchFieldNumber);
+    buf->push_back(static_cast<uint8_t>(kEofTag));
+    buf->push_back(1);
+  }
+
+  // Finally backfill the size of the whole |batch| sub-message.
+  const uint32_t batch_size = static_cast<uint32_t>(
+      buf->size() - batch_size_hdr - pu::kMessageLengthFieldSize);
+  pu::WriteRedundantVarInt(batch_size, buf->data() + batch_size_hdr);
+}
+
+void QueryResultSerializer::MaybeSerializeError(std::vector<uint8_t>* buf) {
+  if (iter_->Status().ok())
+    return;
+  std::string err = iter_->Status().message();
+  // Make sure the |error| field is always non-zero if the query failed, so
+  // the client can tell some error happened.
+  if (err.empty())
+    err = "Unknown error";
+
+  // Write the error and return.
+  uint8_t preamble[16];
+  uint8_t* preamble_end = &preamble[0];
+  *(preamble_end++) = MakeLenDelimTag(ResultProto::kErrorFieldNumber);
+  preamble_end = pu::WriteVarInt(err.size(), preamble_end);
+  buf->insert(buf->end(), preamble, preamble_end);
+  buf->insert(buf->end(), err.begin(), err.end());
+}
+
+void QueryResultSerializer::SerializeColumnNames(std::vector<uint8_t>* buf) {
+  PERFETTO_DCHECK(!did_write_column_names_);
+  for (uint32_t c = 0; c < num_cols_; c++) {
+    std::string col_name = iter_->GetColumnName(c);
+    uint8_t preamble[16];
+    uint8_t* preamble_end = &preamble[0];
+    *(preamble_end++) = MakeLenDelimTag(ResultProto::kColumnNamesFieldNumber);
+    preamble_end = pu::WriteVarInt(col_name.size(), preamble_end);
+    buf->insert(buf->end(), preamble, preamble_end);
+    buf->insert(buf->end(), col_name.begin(), col_name.end());
+  }
+}
+
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/rpc/query_result_serializer.h b/src/trace_processor/rpc/query_result_serializer.h
new file mode 100644
index 0000000..835f602
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
+#define SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
+
+#include <memory>
+#include <vector>
+
+#include <limits.h>
+#include <stddef.h>
+#include <stdint.h>
+
+namespace perfetto {
+namespace trace_processor {
+
+class Iterator;
+class IteratorImpl;
+
+// This class serializes a TraceProcessor query result (i.e. an Iterator)
+// into batches of QueryResult (trace_processor.proto). This class
+// returns results in batches, allowing to deal with O(M) results without
+// full memory buffering. It works as follows:
+// - The iterator is passed in the constructor.
+// - The client is expected to call Serialize(out_buf) until EOF is reached.
+// - For each Serialize() call, this class will serialize a batch of cells,
+//   stopping when either when a number of cells (|cells_per_batch_|) is reached
+//   or when the batch size exceeds (batch_split_threshold_).
+// The intended use case is streaaming these batches onto through a
+// chunked-encoded HTTP response, or through a repetition of Wasm calls.
+class QueryResultSerializer {
+ public:
+  explicit QueryResultSerializer(Iterator);
+  ~QueryResultSerializer();
+
+  // No copy or move.
+  QueryResultSerializer(const QueryResultSerializer&) = delete;
+  QueryResultSerializer& operator=(const QueryResultSerializer&) = delete;
+
+  // Appends the data to the passed vector (note: does NOT clear() the vector
+  // before starting). It returns true if more chunks are available (i.e.
+  // it returns NOT(|eof_reached_||)). The caller is supposed to keep calling
+  // this function until it returns false.
+  bool Serialize(std::vector<uint8_t>*);
+
+  void set_batch_size_for_testing(uint32_t cells_per_batch, uint32_t thres) {
+    cells_per_batch_ = cells_per_batch;
+    batch_split_threshold_ = thres;
+  }
+
+ private:
+  void SerializeColumnNames(std::vector<uint8_t>*);
+  void SerializeBatch(std::vector<uint8_t>*);
+  void MaybeSerializeError(std::vector<uint8_t>*);
+
+  std::unique_ptr<IteratorImpl> iter_;
+  const uint32_t num_cols_;
+  bool did_write_column_names_ = false;
+  bool eof_reached_ = false;
+  uint32_t col_ = UINT32_MAX;
+
+  // Overridable for testing only.
+  uint32_t cells_per_batch_ = 2048;
+  uint32_t batch_split_threshold_ = 1024 * 32;
+};
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_RPC_QUERY_RESULT_SERIALIZER_H_
diff --git a/src/trace_processor/rpc/query_result_serializer_benchmark.cc b/src/trace_processor/rpc/query_result_serializer_benchmark.cc
new file mode 100644
index 0000000..339d6ea
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer_benchmark.cc
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <benchmark/benchmark.h>
+
+#include "perfetto/trace_processor/basic_types.h"
+#include "perfetto/trace_processor/trace_processor.h"
+
+using perfetto::trace_processor::Config;
+using perfetto::trace_processor::QueryResultSerializer;
+using perfetto::trace_processor::TraceProcessor;
+using VectorType = std::vector<uint8_t>;
+
+namespace {
+
+bool IsBenchmarkFunctionalOnly() {
+  return getenv("BENCHMARK_FUNCTIONAL_TEST_ONLY") != nullptr;
+}
+
+void BenchmarkArgs(benchmark::internal::Benchmark* b) {
+  if (IsBenchmarkFunctionalOnly()) {
+    b->Ranges({{1024, 1024}, {4096, 4096}});
+  } else {
+    b->RangeMultiplier(8)->Ranges({{128, 8192}, {4096, 1024 * 512}});
+  }
+}
+
+void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
+  auto iter = tp->ExecuteQuery(query);
+  iter.Next();
+  PERFETTO_CHECK(iter.Status().ok());
+}
+
+}  // namespace
+
+static void BM_QueryResultSerializer_Mixed(benchmark::State& state) {
+  auto tp = TraceProcessor::CreateInstance(Config());
+  RunQueryChecked(tp.get(), "create virtual table win using window;");
+  RunQueryChecked(tp.get(),
+                  "update win set window_start=0, window_dur=50000, quantum=1 "
+                  "where rowid = 0");
+  VectorType buf;
+  for (auto _ : state) {
+    auto iter = tp->ExecuteQuery(
+        "select dur || dur as x, ts, dur * 1.0 as dur, quantum_ts from win");
+    QueryResultSerializer serializer(std::move(iter));
+    serializer.set_batch_size_for_testing(
+        static_cast<uint32_t>(state.range(0)),
+        static_cast<uint32_t>(state.range(1)));
+    while (serializer.Serialize(&buf)) {
+    }
+    benchmark::DoNotOptimize(buf.data());
+    buf.clear();
+  }
+  benchmark::ClobberMemory();
+}
+
+static void BM_QueryResultSerializer_Strings(benchmark::State& state) {
+  auto tp = TraceProcessor::CreateInstance(Config());
+  RunQueryChecked(tp.get(), "create virtual table win using window;");
+  RunQueryChecked(tp.get(),
+                  "update win set window_start=0, window_dur=100000, quantum=1 "
+                  "where rowid = 0");
+  VectorType buf;
+  for (auto _ : state) {
+    auto iter = tp->ExecuteQuery(
+        "select  ts || '-' || ts , (dur * 1.0) || dur from win");
+    QueryResultSerializer serializer(std::move(iter));
+    serializer.set_batch_size_for_testing(
+        static_cast<uint32_t>(state.range(0)),
+        static_cast<uint32_t>(state.range(1)));
+    while (serializer.Serialize(&buf)) {
+    }
+    benchmark::DoNotOptimize(buf.data());
+    buf.clear();
+  }
+  benchmark::ClobberMemory();
+}
+
+BENCHMARK(BM_QueryResultSerializer_Mixed)->Apply(BenchmarkArgs);
+BENCHMARK(BM_QueryResultSerializer_Strings)->Apply(BenchmarkArgs);
diff --git a/src/trace_processor/rpc/query_result_serializer_unittest.cc b/src/trace_processor/rpc/query_result_serializer_unittest.cc
new file mode 100644
index 0000000..aca25ec
--- /dev/null
+++ b/src/trace_processor/rpc/query_result_serializer_unittest.cc
@@ -0,0 +1,435 @@
+
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/rpc/query_result_serializer.h"
+
+#include <deque>
+#include <ostream>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/trace_processor/basic_types.h"
+#include "perfetto/trace_processor/trace_processor.h"
+#include "test/gtest_and_gmock.h"
+
+#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// For ASSERT_THAT(ElementsAre(...))
+inline bool operator==(const SqlValue& a, const SqlValue& b) {
+  if (a.type != b.type)
+    return false;
+  if (a.type == SqlValue::kString)
+    return strcmp(a.string_value, b.string_value) == 0;
+  if (a.type == SqlValue::kBytes) {
+    if (a.bytes_count != b.bytes_count)
+      return false;
+    return memcmp(a.bytes_value, b.bytes_value, a.bytes_count) == 0;
+  }
+  return a.long_value == b.long_value;
+}
+
+inline std::ostream& operator<<(std::ostream& stream, const SqlValue& v) {
+  stream << "SqlValue{";
+  switch (v.type) {
+    case SqlValue::kString:
+      return stream << "\"" << v.string_value << "\"}";
+    case SqlValue::kBytes:
+      return stream << "Bytes[" << v.bytes_count << "]:"
+                    << base::ToHex(reinterpret_cast<const char*>(v.bytes_value),
+                                   v.bytes_count)
+                    << "}";
+    case SqlValue::kLong:
+      return stream << "Long " << v.long_value << "}";
+    case SqlValue::kDouble:
+      return stream << "Double " << v.double_value << "}";
+    case SqlValue::kNull:
+      return stream << "NULL}";
+  }
+  return stream;
+}
+
+namespace {
+
+using ::testing::ElementsAre;
+using BatchProto = protos::pbzero::QueryResult::CellsBatch;
+using ResultProto = protos::pbzero::QueryResult;
+
+void RunQueryChecked(TraceProcessor* tp, const std::string& query) {
+  auto iter = tp->ExecuteQuery(query);
+  iter.Next();
+  ASSERT_TRUE(iter.Status().ok()) << iter.Status().message();
+}
+
+// Implements a minimal deserializer for QueryResultSerializer.
+class TestDeserializer {
+ public:
+  void SerializeAndDeserialize(QueryResultSerializer*);
+  void DeserializeBuffer(const uint8_t* start, size_t size);
+
+  std::vector<std::string> columns;
+  std::vector<SqlValue> cells;
+  std::string error;
+  bool eof_reached = false;
+
+ private:
+  std::vector<std::unique_ptr<char[]>> copied_buf_;
+};
+
+void TestDeserializer::SerializeAndDeserialize(
+    QueryResultSerializer* serializer) {
+  std::vector<uint8_t> buf;
+  error.clear();
+  for (eof_reached = false; !eof_reached;) {
+    serializer->Serialize(&buf);
+    DeserializeBuffer(buf.data(), buf.size());
+    buf.clear();
+  }
+}
+
+void TestDeserializer::DeserializeBuffer(const uint8_t* start, size_t size) {
+  ResultProto::Decoder result(start, size);
+  error += result.error().ToStdString();
+  for (auto it = result.column_names(); it; ++it)
+    columns.push_back(it->as_std_string());
+
+  for (auto batch_it = result.batch(); batch_it; ++batch_it) {
+    ASSERT_FALSE(eof_reached);
+    auto batch_bytes = batch_it->as_bytes();
+
+    ResultProto::CellsBatch::Decoder batch(batch_bytes.data, batch_bytes.size);
+    eof_reached = batch.is_last_batch();
+    std::deque<int64_t> varints;
+    std::deque<double> doubles;
+    std::deque<std::string> blobs;
+
+    bool parse_error = false;
+    for (auto it = batch.varint_cells(&parse_error); it; ++it)
+      varints.emplace_back(*it);
+
+    for (auto it = batch.float64_cells(&parse_error); it; ++it)
+      doubles.emplace_back(*it);
+
+    for (auto it = batch.blob_cells(); it; ++it)
+      blobs.emplace_back((*it).ToStdString());
+
+    std::string merged_strings = batch.string_cells().ToStdString();
+    std::deque<std::string> strings;
+    for (size_t pos = 0; pos < merged_strings.size();) {
+      // Will return npos for the last string, but it's fine
+      size_t next_sep = merged_strings.find('\0', pos);
+      strings.emplace_back(merged_strings.substr(pos, next_sep - pos));
+      pos = next_sep == std::string::npos ? next_sep : next_sep + 1;
+    }
+
+    for (auto it = batch.cells(&parse_error); it; ++it) {
+      uint8_t cell_type = static_cast<uint8_t>(*it);
+      switch (cell_type) {
+        case BatchProto::CELL_INVALID:
+          break;
+        case BatchProto::CELL_NULL:
+          cells.emplace_back(SqlValue());
+          break;
+        case BatchProto::CELL_VARINT:
+          ASSERT_GT(varints.size(), 0u);
+          cells.emplace_back(SqlValue::Long(varints.front()));
+          varints.pop_front();
+          break;
+        case BatchProto::CELL_FLOAT64:
+          ASSERT_GT(doubles.size(), 0u);
+          cells.emplace_back(SqlValue::Double(doubles.front()));
+          doubles.pop_front();
+          break;
+        case BatchProto::CELL_STRING: {
+          ASSERT_GT(strings.size(), 0u);
+          const std::string& str = strings.front();
+          copied_buf_.emplace_back(new char[str.size() + 1]);
+          char* new_buf = copied_buf_.back().get();
+          memcpy(new_buf, str.c_str(), str.size() + 1);
+          cells.emplace_back(SqlValue::String(new_buf));
+          strings.pop_front();
+          break;
+        }
+        case BatchProto::CELL_BLOB: {
+          ASSERT_GT(blobs.size(), 0u);
+          auto bytes = blobs.front();
+          copied_buf_.emplace_back(new char[bytes.size()]);
+          memcpy(copied_buf_.back().get(), bytes.data(), bytes.size());
+          cells.emplace_back(
+              SqlValue::Bytes(copied_buf_.back().get(), bytes.size()));
+          blobs.pop_front();
+          break;
+        }
+        default:
+          FAIL() << "Unknown cell type " << cell_type;
+      }
+
+      EXPECT_FALSE(parse_error);
+    }
+  }
+}
+
+TEST(QueryResultSerializerTest, ShortBatch) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+  auto iter = tp->ExecuteQuery(
+      "select 1 as i8, 128 as i16, 100000 as i32, 42001001001 as i64, 1e9 as "
+      "f64, 'a_string' as str, cast('a_blob' as blob) as blb");
+  QueryResultSerializer ser(std::move(iter));
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+
+  EXPECT_THAT(deser.columns,
+              ElementsAre("i8", "i16", "i32", "i64", "f64", "str", "blb"));
+  EXPECT_THAT(deser.cells,
+              ElementsAre(SqlValue::Long(1), SqlValue::Long(128),
+                          SqlValue::Long(100000), SqlValue::Long(42001001001),
+                          SqlValue::Double(1e9), SqlValue::String("a_string"),
+                          SqlValue::Bytes("a_blob", 6)));
+}
+
+TEST(QueryResultSerializerTest, LongBatch) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+  RunQueryChecked(tp.get(), "create virtual table win using window;");
+  RunQueryChecked(tp.get(),
+                  "update win set window_start=0, window_dur=8192, quantum=1 "
+                  "where rowid = 0");
+
+  auto iter = tp->ExecuteQuery(
+      "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+  QueryResultSerializer ser(std::move(iter));
+
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+
+  ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+  ASSERT_EQ(deser.cells.size(), 4 * 8192u);
+  for (uint32_t row = 0; row < 1024; row++) {
+    uint32_t cell = row * 4;
+    ASSERT_EQ(deser.cells[cell].type, SqlValue::kString);
+    ASSERT_STREQ(deser.cells[cell].string_value, "x");
+
+    ASSERT_EQ(deser.cells[cell + 1].type, SqlValue::kLong);
+    ASSERT_EQ(deser.cells[cell + 1].long_value, row);
+
+    ASSERT_EQ(deser.cells[cell + 2].type, SqlValue::kDouble);
+    ASSERT_EQ(deser.cells[cell + 2].double_value, 1.0);
+
+    ASSERT_EQ(deser.cells[cell + 3].type, SqlValue::kLong);
+    ASSERT_EQ(deser.cells[cell + 3].long_value, row);
+  }
+}
+
+TEST(QueryResultSerializerTest, BatchSaturatingBinaryPayload) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+  RunQueryChecked(tp.get(), "create virtual table win using window;");
+  RunQueryChecked(tp.get(),
+                  "update win set window_start=0, window_dur=1024, quantum=1 "
+                  "where rowid = 0");
+  auto iter = tp->ExecuteQuery(
+      "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+  QueryResultSerializer ser(std::move(iter));
+  ser.set_batch_size_for_testing(1024, 32);
+
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+
+  ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+  ASSERT_EQ(deser.cells.size(), 1024 * 4u);
+}
+
+TEST(QueryResultSerializerTest, BatchSaturatingNumCells) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+
+  RunQueryChecked(tp.get(), "create virtual table win using window;");
+  RunQueryChecked(tp.get(),
+                  "update win set window_start=0, window_dur=4, quantum=1 "
+                  "where rowid = 0");
+  auto iter = tp->ExecuteQuery(
+      "select 'x' as x, ts, dur * 1.0 as dur, quantum_ts from win");
+  QueryResultSerializer ser(std::move(iter));
+  ser.set_batch_size_for_testing(16, 4096);
+
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+
+  ASSERT_THAT(deser.columns, ElementsAre("x", "ts", "dur", "quantum_ts"));
+  ASSERT_EQ(deser.cells.size(), 16u);
+}
+
+TEST(QueryResultSerializerTest, LargeStringAndBlobs) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+  RunQueryChecked(tp.get(), "create table tab (colz);");
+
+  std::minstd_rand0 rnd_engine(0);
+  std::vector<SqlValue> expected;
+  std::string sql_values;
+  std::deque<std::string> string_buf;  // Needs stable pointers
+  for (size_t n = 0; n < 32; n++) {
+    std::string very_long_str;
+    size_t len = (rnd_engine() % 4) * 32 * 1024;
+    very_long_str.resize(len);
+    for (size_t i = 0; i < very_long_str.size(); i++)
+      very_long_str[i] = 'A' + ((n * 11 + i) % 25);
+
+    if (n % 4 == 0) {
+      sql_values += "(NULL),";
+      expected.emplace_back(SqlValue());  // NULL.
+    } else if (n % 4 == 1) {
+      // Blob
+      sql_values += "(X'" + base::ToHex(very_long_str) + "'),";
+      string_buf.emplace_back(std::move(very_long_str));
+      expected.emplace_back(
+          SqlValue::Bytes(string_buf.back().data(), string_buf.back().size()));
+    } else {
+      sql_values += "('" + very_long_str + "'),";
+      string_buf.emplace_back(std::move(very_long_str));
+      expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
+    }
+  }
+  sql_values.resize(sql_values.size() - 1);  // Remove trailing comma.
+  RunQueryChecked(tp.get(), "insert into tab (colz) values " + sql_values);
+
+  auto iter = tp->ExecuteQuery("select colz from tab");
+  QueryResultSerializer ser(std::move(iter));
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+  ASSERT_EQ(deser.cells.size(), expected.size());
+  for (size_t i = 0; i < expected.size(); i++) {
+    EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
+  }
+}
+
+TEST(QueryResultSerializerTest, RandomSizes) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+  static constexpr uint32_t kNumCells = 3 * 1000;
+
+  RunQueryChecked(tp.get(), "create table tab (a, b, c);");
+  std::vector<SqlValue> expected;
+  expected.reserve(kNumCells);
+  std::deque<std::string> string_buf;  // Needs stable pointers
+  std::minstd_rand0 rnd_engine(0);
+  std::string insert_values;
+
+  for (uint32_t i = 0; i < kNumCells; i++) {
+    const uint32_t col = i % 3;
+    if (col == 0)
+      insert_values += "(";
+    int type = rnd_engine() % 5;
+    if (type == 0) {
+      expected.emplace_back(SqlValue());  // NULL
+      insert_values += "NULL";
+    } else if (type == 1) {
+      expected.emplace_back(SqlValue::Long(static_cast<long>(rnd_engine())));
+      insert_values += std::to_string(expected.back().long_value);
+    } else if (type == 2) {
+      expected.emplace_back(SqlValue::Double(rnd_engine() * 1.0));
+      insert_values += std::to_string(expected.back().double_value);
+    } else if (type == 3 || type == 4) {
+      size_t len = (rnd_engine() % 5) * 32;
+      std::string rndstr;
+      rndstr.resize(len);
+      for (size_t n = 0; n < len; n++)
+        rndstr[n] = static_cast<char>(rnd_engine() % 256);
+      auto rndstr_hex = base::ToHex(rndstr);
+      if (type == 3) {
+        insert_values += "\"" + rndstr_hex + "\"";
+        string_buf.emplace_back(std::move(rndstr_hex));
+        expected.emplace_back(SqlValue::String(string_buf.back().c_str()));
+
+      } else {
+        insert_values += "X'" + rndstr_hex + "'";
+        string_buf.emplace_back(std::move(rndstr));
+        expected.emplace_back(SqlValue::Bytes(string_buf.back().data(),
+                                              string_buf.back().size()));
+      }
+    }
+
+    if (col < 2) {
+      insert_values += ",";
+    } else {
+      insert_values += "),";
+      if (insert_values.size() > 1024 * 1024 || i == kNumCells - 1) {
+        insert_values[insert_values.size() - 1] = ';';
+        auto query = "insert into tab (a,b,c) values " + insert_values;
+        insert_values = "";
+        RunQueryChecked(tp.get(), query);
+      }
+    }
+  }
+
+  // Serialize and de-serialize with different batch and payload sizes.
+  for (int rep = 0; rep < 10; rep++) {
+    auto iter = tp->ExecuteQuery("select * from tab");
+    QueryResultSerializer ser(std::move(iter));
+    uint32_t cells_per_batch = 1 << (rnd_engine() % 8 + 2);
+    uint32_t binary_payload_size = 1 << (rnd_engine() % 8 + 8);
+    ser.set_batch_size_for_testing(cells_per_batch, binary_payload_size);
+    TestDeserializer deser;
+    deser.SerializeAndDeserialize(&ser);
+    ASSERT_EQ(deser.cells.size(), expected.size());
+    for (size_t i = 0; i < expected.size(); i++) {
+      EXPECT_EQ(deser.cells[i], expected[i]) << "Cell " << i;
+    }
+  }
+}
+
+TEST(QueryResultSerializerTest, ErrorBeforeStartingQuery) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+  auto iter = tp->ExecuteQuery("insert into incomplete_input");
+  QueryResultSerializer ser(std::move(iter));
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+  EXPECT_EQ(deser.cells.size(), 0u);
+  EXPECT_EQ(deser.error, "incomplete input");
+  EXPECT_TRUE(deser.eof_reached);
+}
+
+TEST(QueryResultSerializerTest, ErrorAfterSomeResults) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+  RunQueryChecked(tp.get(), "create table tab (x)");
+  RunQueryChecked(tp.get(), "insert into tab (x) values (0), (1), ('error')");
+  auto iter = tp->ExecuteQuery("select str_split('a;b', ';', x) as s from tab");
+  QueryResultSerializer ser(std::move(iter));
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+  EXPECT_NE(deser.error, "");
+  EXPECT_THAT(deser.cells,
+              ElementsAre(SqlValue::String("a"), SqlValue::String("b")));
+  EXPECT_TRUE(deser.eof_reached);
+}
+
+TEST(QueryResultSerializerTest, NoResultQuery) {
+  auto tp = TraceProcessor::CreateInstance(trace_processor::Config());
+  auto iter = tp->ExecuteQuery("create table tab (x)");
+  QueryResultSerializer ser(std::move(iter));
+  TestDeserializer deser;
+  deser.SerializeAndDeserialize(&ser);
+  EXPECT_EQ(deser.error, "");
+  EXPECT_EQ(deser.cells.size(), 0u);
+  EXPECT_TRUE(deser.eof_reached);
+}
+
+}  // namespace
+}  // namespace trace_processor
+}  // namespace perfetto