trace_processor: add diff testing for trace processor

Trace processor is notoriously hard to test using unit or integration
tests because of the intermediate steps being closely tied to the code
and being implementation dependent.

Fundamentally, all we really care about is that for a certain set of
traces and queries, we maintain correctness and performance.

This can be done using diff testing where we take a trace and a query
and run the whole end-to-end trace processor over it, comparing the
output to the expected (i.e. correct) output and complaining if that is
not correct.

This CL only addresses the correctness portion. The performance portion
will be tackled in a follow-up.

Bug: 117596113
Change-Id: Idbfe1fc732bea9b3b7a926ef2d99d3c2ac90dc16
diff --git a/src/trace_processor/trace_processor_shell.cc b/src/trace_processor/trace_processor_shell.cc
index c86a82a..9496b27 100644
--- a/src/trace_processor/trace_processor_shell.cc
+++ b/src/trace_processor/trace_processor_shell.cc
@@ -16,10 +16,12 @@
 
 #include <aio.h>
 #include <fcntl.h>
+#include <inttypes.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
 #include <functional>
+#include <iostream>
 
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/logging.h"
@@ -132,7 +134,8 @@
 
 #endif
 
-void OnQueryResult(base::TimeNanos t_start, const protos::RawQueryResult& res) {
+void PrintQueryResultInteractively(base::TimeNanos t_start,
+                                   const protos::RawQueryResult& res) {
   if (res.has_error()) {
     PERFETTO_ELOG("SQLite error: %s", res.error().c_str());
     return;
@@ -164,38 +167,156 @@
     for (int c = 0; c < res.columns_size(); c++) {
       switch (res.column_descriptors(c).type()) {
         case protos::RawQueryResult_ColumnDesc_Type_STRING:
-          printf("%-20.20s ", res.columns(c).string_values(r).c_str());
+          printf("%-20.20s", res.columns(c).string_values(r).c_str());
           break;
         case protos::RawQueryResult_ColumnDesc_Type_DOUBLE:
-          printf("%20f ", res.columns(c).double_values(r));
+          printf("%20f", res.columns(c).double_values(r));
           break;
         case protos::RawQueryResult_ColumnDesc_Type_LONG: {
           auto value = res.columns(c).long_values(r);
-          printf((value < 0xffffffll) ? "%20lld " : "%20llx ", value);
+          printf((value < 0xffffffll) ? "%20lld" : "%20llx", value);
+          break;
+        }
+      }
+      printf(" ");
+    }
+    printf("\n");
+  }
+  printf("\nQuery executed in %.3f ms\n\n", (t_end - t_start).count() / 1E6);
+}
+int StartInteractiveShell() {
+  SetupLineEditor();
 
+  for (;;) {
+    char* line = GetLine("> ");
+    if (!line || strcmp(line, "q\n") == 0)
+      break;
+    if (strcmp(line, "") == 0)
+      continue;
+    protos::RawQueryArgs query;
+    query.set_sql_query(line);
+    base::TimeNanos t_start = base::GetWallTimeNs();
+    g_tp->ExecuteQuery(query, [t_start](const protos::RawQueryResult& res) {
+      PrintQueryResultInteractively(t_start, res);
+    });
+
+    FreeLine(line);
+  }
+  return 0;
+}
+
+void PrintQueryResultAsCsv(const protos::RawQueryResult& res, FILE* output) {
+  PERFETTO_CHECK(res.columns_size() == res.column_descriptors_size());
+
+  for (int r = 0; r < static_cast<int>(res.num_records()); r++) {
+    if (r == 0) {
+      for (int c = 0; c < res.column_descriptors_size(); c++) {
+        const auto& col = res.column_descriptors(c);
+        if (c > 0)
+          fprintf(output, ",");
+        fprintf(output, "\"%s\"", col.name().c_str());
+      }
+      fprintf(output, "\n");
+    }
+
+    for (int c = 0; c < res.columns_size(); c++) {
+      if (c > 0)
+        fprintf(output, ",");
+      switch (res.column_descriptors(c).type()) {
+        case protos::RawQueryResult_ColumnDesc_Type_STRING:
+          fprintf(output, "\"%s\"", res.columns(c).string_values(r).c_str());
+          break;
+        case protos::RawQueryResult_ColumnDesc_Type_DOUBLE:
+          fprintf(output, "%f", res.columns(c).double_values(r));
+          break;
+        case protos::RawQueryResult_ColumnDesc_Type_LONG: {
+          auto value = res.columns(c).long_values(r);
+          fprintf(output, "%lld", value);
           break;
         }
       }
     }
     printf("\n");
   }
-  printf("\nQuery executed in %.3f ms\n\n", (t_end - t_start).count() / 1E6);
+}
+
+int RunQueryAndPrintResult(FILE* input, FILE* output) {
+  char buffer[4096];
+  bool is_first_query = true;
+  bool is_query_error = false;
+  bool has_output_printed = false;
+  while (!feof(input) && !ferror(input) && !is_query_error) {
+    // Add an extra newline separator between query results.
+    if (!is_first_query)
+      fprintf(output, "\n");
+    is_first_query = false;
+
+    std::string sql_query;
+    while (fgets(buffer, sizeof(buffer), input)) {
+      if (strncmp(buffer, "\n", sizeof(buffer)) == 0)
+        break;
+      sql_query.append(buffer);
+    }
+    if (sql_query.back() == '\n')
+      sql_query.resize(sql_query.size() - 1);
+    PERFETTO_ILOG("Executing query: %s", sql_query.c_str());
+
+    protos::RawQueryArgs query;
+    query.set_sql_query(sql_query);
+    g_tp->ExecuteQuery(query, [output, &is_query_error, &has_output_printed](
+                                  const protos::RawQueryResult& res) {
+      if (res.has_error()) {
+        PERFETTO_ELOG("SQLite error: %s", res.error().c_str());
+        is_query_error = true;
+        return;
+      } else if (res.num_records() != 0) {
+        if (has_output_printed) {
+          PERFETTO_ELOG(
+              "More than one query generated result rows. This is "
+              "unsupported.");
+          is_query_error = true;
+          return;
+        }
+        has_output_printed = true;
+      }
+      PrintQueryResultAsCsv(res, output);
+    });
+  }
+  return is_query_error ? 1 : 0;
+}
+
+void PrintUsage(char** argv) {
+  PERFETTO_ELOG("Usage: %s [-d] [-q query.sql] trace_file.proto", argv[0]);
 }
 
 int TraceProcessorMain(int argc, char** argv) {
   if (argc < 2) {
-    PERFETTO_ELOG("Usage: %s [-d] trace_file.proto", argv[0]);
+    PrintUsage(argv);
     return 1;
   }
   const char* trace_file_path = nullptr;
+  const char* query_file_path = nullptr;
   for (int i = 1; i < argc; i++) {
     if (strcmp(argv[i], "-d") == 0) {
       EnableSQLiteVtableDebugging();
       continue;
     }
+    if (strcmp(argv[i], "-q") == 0) {
+      if (++i == argc) {
+        PrintUsage(argv);
+        return 1;
+      }
+      query_file_path = argv[i];
+      continue;
+    }
     trace_file_path = argv[i];
   }
 
+  if (trace_file_path == nullptr) {
+    PrintUsage(argv);
+    return 1;
+  }
+
   // Load the trace file into the trace processor.
   TraceProcessor::Config config;
   config.optimization_mode = OptimizationMode::kMaxBandwidth;
@@ -253,25 +374,14 @@
   signal(SIGINT, [](int) { g_tp->InterruptQuery(); });
 #endif
 
-  SetupLineEditor();
-
-  for (;;) {
-    char* line = GetLine("> ");
-    if (!line || strcmp(line, "q\n") == 0)
-      break;
-    if (strcmp(line, "") == 0)
-      continue;
-    protos::RawQueryArgs query;
-    query.set_sql_query(line);
-    base::TimeNanos t_start = base::GetWallTimeNs();
-    g_tp->ExecuteQuery(query, [t_start](const protos::RawQueryResult& res) {
-      OnQueryResult(t_start, res);
-    });
-
-    FreeLine(line);
+  // If there is no query file, start a shell.
+  if (query_file_path == nullptr) {
+    return StartInteractiveShell();
   }
 
-  return 0;
+  // Otherwise run the queries and print the results.
+  base::ScopedFstream file(fopen(query_file_path, "r"));
+  return RunQueryAndPrintResult(file.get(), stdout);
 }
 
 }  // namespace
diff --git a/test/trace_processor/android_sched_and_ps_smoke.out b/test/trace_processor/android_sched_and_ps_smoke.out
new file mode 100644
index 0000000..d5d80e1
--- /dev/null
+++ b/test/trace_processor/android_sched_and_ps_smoke.out
@@ -0,0 +1,25 @@
+"ts","cpu","dur","utid"
+81473010031230,2,78021,1
+81473010109251,2,12500,0
+81473010121751,2,58021,2
+81473010179772,2,24114,0
+81473010203886,2,30834,3
+81473010234720,2,43802,0
+81473010278522,2,29948,4
+81473010308470,2,44322,0
+81473010341386,1,158854,5
+81473010352792,2,32917,6
+
+
+
+"ts","dur","cpu","utid","rowid","quantum","window_start","window_dur","quantum_ts"
+81473010031230,78021,2,1,2,0,0,-1,0
+81473010109251,12500,2,0,2,0,0,-1,0
+81473010121751,58021,2,2,2,0,0,-1,0
+81473010179772,24114,2,0,2,0,0,-1,0
+81473010203886,30834,2,3,2,0,0,-1,0
+81473010234720,43802,2,0,2,0,0,-1,0
+81473010278522,29948,2,4,2,0,0,-1,0
+81473010308470,44322,2,0,2,0,0,-1,0
+81473010341386,158854,1,5,1,0,0,-1,0
+81473010352792,32917,2,6,2,0,0,-1,0
diff --git a/test/trace_processor/index b/test/trace_processor/index
new file mode 100644
index 0000000..6854209
--- /dev/null
+++ b/test/trace_processor/index
@@ -0,0 +1 @@
+../data/android_sched_and_ps.pb smoke.sql android_sched_and_ps_smoke.out
diff --git a/test/trace_processor/smoke.sql b/test/trace_processor/smoke.sql
new file mode 100644
index 0000000..dd760b4
--- /dev/null
+++ b/test/trace_processor/smoke.sql
@@ -0,0 +1,7 @@
+SELECT * from sched limit 10;
+
+SELECT * from counters limit 10;
+
+CREATE VIRTUAL TABLE sp USING span(sched, window, cpu);
+
+SELECT * from sp order by ts limit 10;
diff --git a/tools/diff_test_trace_processor.py b/tools/diff_test_trace_processor.py
new file mode 100755
index 0000000..f239440
--- /dev/null
+++ b/tools/diff_test_trace_processor.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+# Copyright (C) 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import difflib
+import glob
+import os
+import subprocess
+import sys
+
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+TEST_DATA_DIR = os.path.join(ROOT_DIR, "test", "trace_processor")
+
+def main():
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--index', type=str, help='location of index file',
+                      default=os.path.join(TEST_DATA_DIR, "index"))
+  parser.add_argument('trace_processor', type=str,
+                      help='location of trace processor binary')
+  args = parser.parse_args()
+
+  with open(args.index, 'r') as file:
+    index_lines = file.readlines()
+
+  test_failure = 0
+  index_dir = os.path.dirname(args.index)
+  for line in index_lines:
+    [trace_fname, query_fname, expected_fname] = line.strip().split(' ')
+
+    trace_path = os.path.abspath(os.path.join(index_dir, trace_fname))
+    query_path = os.path.abspath(os.path.join(index_dir, query_fname))
+    expected_path = os.path.abspath(os.path.join(index_dir, expected_fname))
+    if not os.path.exists(trace_path):
+      print("Trace file not found {}".format(trace_path))
+      return 1
+    elif not os.path.exists(query_path):
+      print("Query file not found {}".format(query_path))
+      return 1
+    elif not os.path.exists(expected_path):
+      print("Expected file not found {}".format(expected_path))
+      return 1
+
+    actual_raw = subprocess.check_output([
+      args.trace_processor,
+      '-q',
+      query_path,
+      trace_path
+    ])
+    actual = actual_raw.decode("utf-8")
+    actual_lines = actual_raw.splitlines(True)
+
+    with open(expected_path, "r") as expected_file:
+      expected = expected_file.read()
+      if expected != actual:
+        sys.stderr.write(
+          "Expected did not match actual for trace {} and query {}"
+          .format(trace_path, query_path))
+
+        expected_lines = expected.splitlines(True)
+        diff = difflib.unified_diff(expected_lines, actual_lines,
+                                    fromfile="expected", tofile="actual")
+        for line in diff:
+          sys.stderr.write(line)
+        test_failure += 1
+
+  if test_failure == 0:
+    print("All tests passed successfully")
+    return 0
+  else:
+    print("Total failures: {}".format(test_failure))
+    return 1
+
+if __name__ == '__main__':
+  sys.exit(main())