trace_processor: overhaul comparision in db tables

This CL overhauls comparisions in all the db tables by centralising the
logic of comparing between values into a single file. By doing this, it
is easier to be both internally consistent (as all comparisions should
call through to one of the comparision functions) and consistent with
SQLite (as we can easily audit that the behaviour matches).

We also make SqlValue comparisions consistent with the strongly typed
comparisions which had drifted out of sync after the code to compare
between doubles and longs was added.

Context: go/perfetto-tp-refactor
Bug: 135177627
Change-Id: I1f9131de872128b5a1c3b08592ede99d2a879a70
diff --git a/Android.bp b/Android.bp
index 52d99ef..5078d34 100644
--- a/Android.bp
+++ b/Android.bp
@@ -5645,6 +5645,7 @@
   name: "perfetto_src_trace_processor_db_unittests",
   srcs: [
     "src/trace_processor/db/bit_vector_unittest.cc",
+    "src/trace_processor/db/compare_unittest.cc",
     "src/trace_processor/db/row_map_unittest.cc",
     "src/trace_processor/db/sparse_vector_unittest.cc",
   ],
@@ -5808,7 +5809,6 @@
   name: "perfetto_src_trace_processor_unittests",
   srcs: [
     "src/trace_processor/args_table_unittest.cc",
-    "src/trace_processor/basic_types_unittest.cc",
     "src/trace_processor/clock_tracker_unittest.cc",
     "src/trace_processor/event_tracker_unittest.cc",
     "src/trace_processor/filtered_row_index_unittest.cc",
diff --git a/BUILD b/BUILD
index e670164..1dde88e 100644
--- a/BUILD
+++ b/BUILD
@@ -582,6 +582,7 @@
         "src/trace_processor/db/bit_vector_iterators.h",
         "src/trace_processor/db/column.cc",
         "src/trace_processor/db/column.h",
+        "src/trace_processor/db/compare.h",
         "src/trace_processor/db/row_map.cc",
         "src/trace_processor/db/row_map.h",
         "src/trace_processor/db/sparse_vector.h",
diff --git a/include/perfetto/trace_processor/basic_types.h b/include/perfetto/trace_processor/basic_types.h
index fdab153..db67e79 100644
--- a/include/perfetto/trace_processor/basic_types.h
+++ b/include/perfetto/trace_processor/basic_types.h
@@ -84,42 +84,6 @@
     return double_value;
   }
 
-  int64_t Compare(const SqlValue& value) const {
-    // TODO(lalitm): this is almost the same as what SQLite does with the
-    // exception of comparisions between long and double - we choose (for
-    // performance reasons) to omit comparisions between them.
-    if (type != value.type)
-      return type - value.type;
-
-    switch (type) {
-      case Type::kNull:
-        return 0;
-      case Type::kLong:
-        return long_value - value.long_value;
-      case Type::kDouble: {
-        return double_value < value.double_value
-                   ? -1
-                   : (double_value > value.double_value ? 1 : 0);
-      }
-      case Type::kString:
-        return strcmp(string_value, value.string_value);
-      case Type::kBytes: {
-        size_t bytes = std::min(bytes_count, value.bytes_count);
-        int ret = memcmp(bytes_value, value.bytes_value, bytes);
-        if (ret != 0)
-          return ret;
-        return static_cast<int64_t>(bytes_count - value.bytes_count);
-      }
-    }
-    PERFETTO_FATAL("For GCC");
-  }
-  bool operator==(const SqlValue& value) const { return Compare(value) == 0; }
-  bool operator<(const SqlValue& value) const { return Compare(value) < 0; }
-  bool operator!=(const SqlValue& value) const { return !(*this == value); }
-  bool operator>=(const SqlValue& value) const { return !(*this < value); }
-  bool operator<=(const SqlValue& value) const { return !(value < *this); }
-  bool operator>(const SqlValue& value) const { return value < *this; }
-
   bool is_null() const { return type == Type::kNull; }
 
   // Up to 1 of these fields can be accessed depending on |type|.
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 1f33a80..b3ec9a9 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -400,7 +400,6 @@
 perfetto_unittest_source_set("unittests") {
   testonly = true
   sources = [
-    "basic_types_unittest.cc",
     "clock_tracker_unittest.cc",
     "event_tracker_unittest.cc",
     "forwarding_trace_parser_unittest.cc",
diff --git a/src/trace_processor/basic_types_unittest.cc b/src/trace_processor/basic_types_unittest.cc
deleted file mode 100644
index a2a0fd2..0000000
--- a/src/trace_processor/basic_types_unittest.cc
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2019 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "perfetto/trace_processor/basic_types.h"
-#include "test/gtest_and_gmock.h"
-
-namespace perfetto {
-namespace trace_processor {
-namespace {
-
-TEST(SqlValueTest, DifferentTypes) {
-  ASSERT_LT(SqlValue(), SqlValue::Long(10));
-  ASSERT_LT(SqlValue::Long(10), SqlValue::Double(10.0));
-  ASSERT_LT(SqlValue::Double(10.0), SqlValue::String("10"));
-}
-
-TEST(SqlValueTest, CompareLong) {
-  SqlValue int32_min = SqlValue::Long(std::numeric_limits<int32_t>::min());
-  SqlValue minus_1 = SqlValue::Long(-1);
-  SqlValue zero = SqlValue::Long(0);
-  SqlValue uint32_max = SqlValue::Long(std::numeric_limits<uint32_t>::max());
-
-  ASSERT_LT(int32_min, minus_1);
-  ASSERT_LT(int32_min, uint32_max);
-  ASSERT_LT(minus_1, uint32_max);
-
-  ASSERT_GT(uint32_max, zero);
-
-  ASSERT_EQ(zero, zero);
-}
-
-TEST(SqlValueTest, CompareDouble) {
-  SqlValue int32_min = SqlValue::Double(std::numeric_limits<int32_t>::min());
-  SqlValue minus_1 = SqlValue::Double(-1.0);
-  SqlValue zero = SqlValue::Double(0);
-  SqlValue uint32_max = SqlValue::Double(std::numeric_limits<uint32_t>::max());
-
-  ASSERT_LT(int32_min, minus_1);
-  ASSERT_LT(int32_min, uint32_max);
-  ASSERT_LT(minus_1, uint32_max);
-
-  ASSERT_GT(uint32_max, zero);
-
-  ASSERT_EQ(zero, zero);
-}
-
-TEST(SqlValueTest, CompareString) {
-  SqlValue a = SqlValue::String("a");
-  SqlValue aa = SqlValue::String("aa");
-  SqlValue b = SqlValue::String("b");
-
-  ASSERT_LT(a, aa);
-  ASSERT_LT(aa, b);
-  ASSERT_LT(a, b);
-
-  ASSERT_GT(aa, a);
-
-  ASSERT_EQ(a, a);
-  ASSERT_EQ(aa, SqlValue::String("aa"));
-}
-
-}  // namespace
-}  // namespace trace_processor
-}  // namespace perfetto
diff --git a/src/trace_processor/db/BUILD.gn b/src/trace_processor/db/BUILD.gn
index 57366d2..30e2518 100644
--- a/src/trace_processor/db/BUILD.gn
+++ b/src/trace_processor/db/BUILD.gn
@@ -22,6 +22,7 @@
     "bit_vector_iterators.h",
     "column.cc",
     "column.h",
+    "compare.h",
     "row_map.cc",
     "row_map.h",
     "sparse_vector.h",
@@ -42,6 +43,7 @@
   testonly = true
   sources = [
     "bit_vector_unittest.cc",
+    "compare_unittest.cc",
     "row_map_unittest.cc",
     "sparse_vector_unittest.cc",
   ]
diff --git a/src/trace_processor/db/column.cc b/src/trace_processor/db/column.cc
index e06258c..fd9f6b2 100644
--- a/src/trace_processor/db/column.cc
+++ b/src/trace_processor/db/column.cc
@@ -16,40 +16,12 @@
 
 #include "src/trace_processor/db/column.h"
 
+#include "src/trace_processor/db/compare.h"
 #include "src/trace_processor/db/table.h"
 
 namespace perfetto {
 namespace trace_processor {
 
-namespace {
-
-// This code mathces the behaviour of sqlite3IntFloatCompare to ensure that
-// we are consistent with SQLite.
-int CompareIntToDouble(int64_t i, double d) {
-  // First check if we are out of range for a int64_t. We use the constants
-  // directly instead of using numeric_limits as the casts introduces rounding
-  // in the doubles as a double cannot exactly represent int64::max().
-  if (d >= 9223372036854775808.0)
-    return -1;
-  if (d < -9223372036854775808.0)
-    return 1;
-
-  // Then, try to compare in int64 space to try and keep as much precision as
-  // possible.
-  int64_t d_i = static_cast<int64_t>(d);
-  if (i < d_i)
-    return -1;
-  if (i > d_i)
-    return 1;
-
-  // Finally, try and compare in double space, sacrificing precision if
-  // necessary.
-  double i_d = static_cast<double>(i);
-  return (i_d < d) ? -1 : (i_d > d ? 1 : 0);
-}
-
-}  // namespace
-
 Column::Column(const Column& column,
                Table* table,
                uint32_t col_idx,
@@ -168,7 +140,10 @@
     double double_value = value.double_value;
     if (std::is_same<T, double>::value) {
       auto fn = [double_value](T v) {
-        return v < double_value ? -1 : (v > double_value ? 1 : 0);
+        // We static cast here as this code will be compiled even when T ==
+        // int64_t as we don't have if constexpr in C++11. In reality the cast
+        // is a noop but we cannot statically verify that for the compiler.
+        return compare::Numeric(static_cast<double>(v), double_value);
       };
       FilterIntoNumericWithComparatorSlow<T, is_nullable>(op, rm, fn);
     } else {
@@ -176,7 +151,7 @@
         // We static cast here as this code will be compiled even when T ==
         // double as we don't have if constexpr in C++11. In reality the cast is
         // a noop but we cannot statically verify that for the compiler.
-        return CompareIntToDouble(static_cast<int64_t>(v), double_value);
+        return compare::LongToDouble(static_cast<int64_t>(v), double_value);
       };
       FilterIntoNumericWithComparatorSlow<T, is_nullable>(op, rm, fn);
     }
@@ -188,12 +163,15 @@
         // for this function even though the LHS of the comparator should
         // actually be |v|. This saves us having a duplicate implementation of
         // the comparision function.
-        return -CompareIntToDouble(long_value, v);
+        return -compare::LongToDouble(long_value, v);
       };
       FilterIntoNumericWithComparatorSlow<T, is_nullable>(op, rm, fn);
     } else {
       auto fn = [long_value](T v) {
-        return v < long_value ? -1 : (v > long_value ? 1 : 0);
+        // We static cast here as this code will be compiled even when T ==
+        // double as we don't have if constexpr in C++11. In reality the cast is
+        // a noop but we cannot statically verify that for the compiler.
+        return compare::Numeric(static_cast<int64_t>(v), long_value);
       };
       FilterIntoNumericWithComparatorSlow<T, is_nullable>(op, rm, fn);
     }
@@ -295,41 +273,43 @@
   }
 
   NullTermStringView str_value = value.string_value;
+  PERFETTO_DCHECK(str_value.data() != nullptr);
+
   switch (op) {
     case FilterOp::kLt:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v < str_value;
+        return v.data() != nullptr && compare::String(v, str_value) < 0;
       });
       break;
     case FilterOp::kEq:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v == str_value;
+        return v.data() != nullptr && compare::String(v, str_value) == 0;
       });
       break;
     case FilterOp::kGt:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v > str_value;
+        return v.data() != nullptr && compare::String(v, str_value) > 0;
       });
       break;
     case FilterOp::kNe:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v != str_value;
+        return v.data() != nullptr && compare::String(v, str_value) != 0;
       });
       break;
     case FilterOp::kLe:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v <= str_value;
+        return v.data() != nullptr && compare::String(v, str_value) <= 0;
       });
       break;
     case FilterOp::kGe:
       row_map().FilterInto(rm, [this, str_value](uint32_t idx) {
         auto v = GetStringPoolStringAtIdx(idx);
-        return v.data() != nullptr && v >= str_value;
+        return v.data() != nullptr && compare::String(v, str_value) >= 0;
       });
       break;
     case FilterOp::kLike:
@@ -363,28 +343,34 @@
   uint32_t id_value = static_cast<uint32_t>(value.long_value);
   switch (op) {
     case FilterOp::kLt:
-      row_map().FilterInto(rm,
-                           [id_value](uint32_t idx) { return idx < id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) < 0;
+      });
       break;
     case FilterOp::kEq:
-      row_map().FilterInto(
-          rm, [id_value](uint32_t idx) { return idx == id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) == 0;
+      });
       break;
     case FilterOp::kGt:
-      row_map().FilterInto(rm,
-                           [id_value](uint32_t idx) { return idx > id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) > 0;
+      });
       break;
     case FilterOp::kNe:
-      row_map().FilterInto(
-          rm, [id_value](uint32_t idx) { return idx != id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) != 0;
+      });
       break;
     case FilterOp::kLe:
-      row_map().FilterInto(
-          rm, [id_value](uint32_t idx) { return idx <= id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) <= 0;
+      });
       break;
     case FilterOp::kGe:
-      row_map().FilterInto(
-          rm, [id_value](uint32_t idx) { return idx >= id_value; });
+      row_map().FilterInto(rm, [id_value](uint32_t idx) {
+        return compare::Numeric(idx, id_value) >= 0;
+      });
       break;
     case FilterOp::kLike:
       rm->Intersect(RowMap());
@@ -400,33 +386,33 @@
   switch (type_) {
     case ColumnType::kInt32: {
       if (IsNullable()) {
-        StableSort<desc, int32_t, true /* is_nullable */>(out);
+        StableSortNumeric<desc, int32_t, true /* is_nullable */>(out);
       } else {
-        StableSort<desc, int32_t, false /* is_nullable */>(out);
+        StableSortNumeric<desc, int32_t, false /* is_nullable */>(out);
       }
       break;
     }
     case ColumnType::kUint32: {
       if (IsNullable()) {
-        StableSort<desc, uint32_t, true /* is_nullable */>(out);
+        StableSortNumeric<desc, uint32_t, true /* is_nullable */>(out);
       } else {
-        StableSort<desc, uint32_t, false /* is_nullable */>(out);
+        StableSortNumeric<desc, uint32_t, false /* is_nullable */>(out);
       }
       break;
     }
     case ColumnType::kInt64: {
       if (IsNullable()) {
-        StableSort<desc, int64_t, true /* is_nullable */>(out);
+        StableSortNumeric<desc, int64_t, true /* is_nullable */>(out);
       } else {
-        StableSort<desc, int64_t, false /* is_nullable */>(out);
+        StableSortNumeric<desc, int64_t, false /* is_nullable */>(out);
       }
       break;
     }
     case ColumnType::kDouble: {
       if (IsNullable()) {
-        StableSort<desc, double, true /* is_nullable */>(out);
+        StableSortNumeric<desc, double, true /* is_nullable */>(out);
       } else {
-        StableSort<desc, double, false /* is_nullable */>(out);
+        StableSortNumeric<desc, double, false /* is_nullable */>(out);
       }
       break;
     }
@@ -434,19 +420,22 @@
       row_map().StableSort(out, [this](uint32_t a_idx, uint32_t b_idx) {
         auto a_str = GetStringPoolStringAtIdx(a_idx);
         auto b_str = GetStringPoolStringAtIdx(b_idx);
-        return desc ? b_str < a_str : a_str < b_str;
+
+        int res = compare::NullableString(a_str, b_str);
+        return desc ? res > 0 : res < 0;
       });
       break;
     }
     case ColumnType::kId:
       row_map().StableSort(out, [](uint32_t a_idx, uint32_t b_idx) {
-        return desc ? b_idx < a_idx : a_idx < b_idx;
+        int res = compare::Numeric(a_idx, b_idx);
+        return desc ? res > 0 : res < 0;
       });
   }
 }
 
 template <bool desc, typename T, bool is_nullable>
-void Column::StableSort(std::vector<uint32_t>* out) const {
+void Column::StableSortNumeric(std::vector<uint32_t>* out) const {
   PERFETTO_DCHECK(IsNullable() == is_nullable);
   PERFETTO_DCHECK(ToColumnType<T>() == type_);
 
@@ -455,11 +444,15 @@
     if (is_nullable) {
       auto a_val = sv.Get(a_idx);
       auto b_val = sv.Get(b_idx);
-      return desc ? b_val < a_val : a_val < b_val;
+
+      int res = compare::NullableNumeric(a_val, b_val);
+      return desc ? res > 0 : res < 0;
     }
     auto a_val = sv.GetNonNull(a_idx);
     auto b_val = sv.GetNonNull(b_idx);
-    return desc ? b_val < a_val : a_val < b_val;
+
+    int res = compare::Numeric(a_val, b_val);
+    return desc ? res > 0 : res < 0;
   });
 }
 
diff --git a/src/trace_processor/db/column.h b/src/trace_processor/db/column.h
index 0117539..0e3d927 100644
--- a/src/trace_processor/db/column.h
+++ b/src/trace_processor/db/column.h
@@ -22,6 +22,7 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/ext/base/optional.h"
 #include "perfetto/trace_processor/basic_types.h"
+#include "src/trace_processor/db/compare.h"
 #include "src/trace_processor/db/row_map.h"
 #include "src/trace_processor/db/sparse_vector.h"
 #include "src/trace_processor/string_pool.h"
@@ -131,7 +132,7 @@
       case ColumnType::kDouble:
       case ColumnType::kString: {
         for (uint32_t i = 0; i < row_map().size(); i++) {
-          if (Get(i) == value)
+          if (compare::SqlValue(Get(i), value) == 0)
             return i;
         }
         return base::nullopt;
@@ -389,28 +390,34 @@
     Iterator e(this, row_map().size());
     switch (op) {
       case FilterOp::kEq: {
-        uint32_t beg = std::distance(b, std::lower_bound(b, e, value));
-        uint32_t end = std::distance(b, std::upper_bound(b, e, value));
+        uint32_t beg = std::distance(
+            b, std::lower_bound(b, e, value, &compare::SqlValueComparator));
+        uint32_t end = std::distance(
+            b, std::upper_bound(b, e, value, &compare::SqlValueComparator));
         rm->Intersect(RowMap(beg, end));
         return true;
       }
       case FilterOp::kLe: {
-        uint32_t end = std::distance(b, std::upper_bound(b, e, value));
+        uint32_t end = std::distance(
+            b, std::upper_bound(b, e, value, &compare::SqlValueComparator));
         rm->Intersect(RowMap(0, end));
         return true;
       }
       case FilterOp::kLt: {
-        uint32_t end = std::distance(b, std::lower_bound(b, e, value));
+        uint32_t end = std::distance(
+            b, std::lower_bound(b, e, value, &compare::SqlValueComparator));
         rm->Intersect(RowMap(0, end));
         return true;
       }
       case FilterOp::kGe: {
-        uint32_t beg = std::distance(b, std::lower_bound(b, e, value));
+        uint32_t beg = std::distance(
+            b, std::lower_bound(b, e, value, &compare::SqlValueComparator));
         rm->Intersect(RowMap(beg, row_map().size()));
         return true;
       }
       case FilterOp::kGt: {
-        uint32_t beg = std::distance(b, std::upper_bound(b, e, value));
+        uint32_t beg = std::distance(
+            b, std::upper_bound(b, e, value, &compare::SqlValueComparator));
         rm->Intersect(RowMap(beg, row_map().size()));
         return true;
       }
@@ -450,7 +457,7 @@
   // Stable sorts this column storing the result in |out|.
   // |T| and |is_nullable| should match the type and nullability of this column.
   template <bool desc, typename T, bool is_nullable>
-  void StableSort(std::vector<uint32_t>* out) const;
+  void StableSortNumeric(std::vector<uint32_t>* out) const;
 
   template <typename T>
   static ColumnType ToColumnType() {
diff --git a/src/trace_processor/db/compare.h b/src/trace_processor/db/compare.h
new file mode 100644
index 0000000..9f3f9a5
--- /dev/null
+++ b/src/trace_processor/db/compare.h
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_DB_COMPARE_H_
+#define SRC_TRACE_PROCESSOR_DB_COMPARE_H_
+
+#include <stdint.h>
+
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/string_view.h"
+#include "perfetto/trace_processor/basic_types.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace compare {
+
+// This file contains the de-facto impleemntation of all comparisions used by
+// trace processor in every setting. All of this is centralised in one file to
+// ensure both internal consistency with SQLite and consistency with SQLite.
+
+// Compare a non-null numeric to a double; returns:
+//  * <0 if i < d,
+//  * >0 if i > d.
+//  *  0 otherwise
+// This code matches the behaviour of sqlite3IntFloatCompare.
+inline int LongToDouble(int64_t i, double d) {
+  // First check if we are out of range for a int64_t. We use the constants
+  // directly instead of using numeric_limits as the casts introduces rounding
+  // in the doubles as a double cannot exactly represent int64::max().
+  if (d >= 9223372036854775808.0)
+    return -1;
+  if (d < -9223372036854775808.0)
+    return 1;
+
+  // Then, try to compare in int64 space to try and keep as much precision as
+  // possible.
+  int64_t d_i = static_cast<int64_t>(d);
+  if (i < d_i)
+    return -1;
+  if (i > d_i)
+    return 1;
+
+  // Finally, try and compare in double space, sacrificing precision if
+  // necessary.
+  double i_d = static_cast<double>(i);
+  return (i_d < d) ? -1 : (i_d > d ? 1 : 0);
+}
+
+// Compares two non-null numeric values; returns:
+//  * <0 if a < b,
+//  * >0 if a > b.
+//  *  0 otherwise
+// This code matches the behaviour of the inline code in the comparision path of
+// sqlite3VdbeExec (for ints) and the behaviour of sqlite3MemCompare (for
+// doubles).
+template <typename T>
+inline int Numeric(T a, T b) {
+  static_assert(std::is_arithmetic<T>::value,
+                "Numeric comparision performed with non-numeric type");
+  return a < b ? -1 : (a > b ? 1 : 0);
+}
+
+// Compares two non-null bytes values; returns:
+//  * <0 if a < b,
+//  * >0 if a > b.
+//  *  0 otherwise
+// This code matches the behaviour of sqlite3BlobCompare.
+inline int Bytes(const void* a, size_t a_n, const void* b, size_t b_n) {
+  int res = memcmp(a, b, std::min(a_n, b_n));
+  return res != 0 ? res
+                  : static_cast<int>(static_cast<int64_t>(a_n) -
+                                     static_cast<int64_t>(b_n));
+}
+
+// Compares two non-null string values; returns:
+//  * <0 if a < b,
+//  * >0 if a > b.
+//  *  0 otherwise
+// This code matches the behaviour of sqlite3BlobCompare which is called when
+// there is no collation sequence defined in sqlite3MemCompare.
+inline int String(base::StringView a, base::StringView b) {
+  PERFETTO_DCHECK(a.data() != nullptr);
+  PERFETTO_DCHECK(b.data() != nullptr);
+  return Bytes(a.data(), a.size(), b.data(), b.size());
+}
+
+// Compares two nullable numeric values; returns:
+//  *  0 if both a and b are null
+//  * <0 if a is null and b is non null
+//  * >0 if a is non null and b is null
+//  * <0 if a < b (a and b both non null)
+//  * >0 if a > b (a and b both non null)
+//  *  0 otherwise
+// Should only be used for defining an ordering on value of type T. For filter
+// functions, compare::Numeric should be used above after checking if the value
+// is null.
+// This method was defined from observing the behaviour of SQLite when sorting
+// on columns containing nulls.
+template <typename T>
+inline int NullableNumeric(base::Optional<T> a, base::Optional<T> b) {
+  if (!a)
+    return b ? -1 : 0;
+
+  if (!b)
+    return 1;
+
+  return compare::Numeric(*a, *b);
+}
+
+// Compares two strings, either of which can be null; returns:
+//  *  0 if both a and b are null
+//  * <0 if a is null and b is non null
+//  * >0 if a is non null and b is null
+//  * <0 if a < b (a and b both non null)
+//  * >0 if a > b (a and b both non null)
+//  *  0 otherwise
+// Should only be used for defining an ordering on value of type T. For filter
+// functions, compare::String should be used above after checking if the value
+// is null.
+// This method was defined from observing the behaviour of SQLite when sorting
+// on columns containing nulls.
+inline int NullableString(base::StringView a, base::StringView b) {
+  if (!a.data())
+    return b.data() ? -1 : 0;
+
+  if (!b.data())
+    return 1;
+
+  return compare::String(a, b);
+}
+
+// Compares two SqlValue; returns:
+//  * <0 if a.type < b.type and a and b are not both numeric
+//  * >0 if a.type > b.type and a and b are not both numeric
+//  * <0 if a < b (a and b both non null and either of same type or numeric)
+//  * >0 if a > b (a and b both non null and either of same type or numeric)
+//  *  0 otherwise
+// This code roughly matches the behaviour of the code in the comparision path
+// of sqlite3VdbeExec except we are intentionally more strict than SQLite when
+// it comes to casting between strings and numerics - we disallow moving between
+// the two. We do allow comparing between doubles and longs however as doubles
+// can easily appear by using constants in SQL even when intending to use longs.
+inline int SqlValue(const SqlValue& a, const SqlValue& b) {
+  if (a.type != b.type) {
+    if (a.type == SqlValue::kLong && b.type == SqlValue::kDouble)
+      return compare::LongToDouble(a.long_value, b.double_value);
+
+    if (a.type == SqlValue::kDouble && b.type == SqlValue::kLong)
+      return -compare::LongToDouble(b.long_value, a.double_value);
+
+    return a.type - b.type;
+  }
+
+  switch (a.type) {
+    case SqlValue::Type::kLong:
+      return compare::Numeric(a.long_value, b.long_value);
+    case SqlValue::Type::kDouble:
+      return compare::Numeric(a.double_value, b.double_value);
+    case SqlValue::Type::kString:
+      return compare::String(a.string_value, b.string_value);
+    case SqlValue::Type::kBytes:
+      return compare::Bytes(a.bytes_value, a.bytes_count, b.bytes_value,
+                            b.bytes_count);
+    case SqlValue::Type::kNull:
+      return 0;
+  }
+  PERFETTO_FATAL("For GCC");
+}
+
+// Implements a comparator for SqlValues to use for std algorithms.
+// See documentation of compare::SqlValue for details of how this function
+// works.
+inline int SqlValueComparator(const trace_processor::SqlValue& a,
+                              const trace_processor::SqlValue& b) {
+  return compare::SqlValue(a, b) < 0;
+}
+
+}  // namespace compare
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_DB_COMPARE_H_
diff --git a/src/trace_processor/db/compare_unittest.cc b/src/trace_processor/db/compare_unittest.cc
new file mode 100644
index 0000000..190fb9a
--- /dev/null
+++ b/src/trace_processor/db/compare_unittest.cc
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/db/compare.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+TEST(CompareTest, SqlValueDifferentTypes) {
+  ASSERT_LT(compare::SqlValue(SqlValue(), SqlValue::Long(10)), 0);
+  ASSERT_LT(compare::SqlValue(SqlValue::Double(10.0), SqlValue::String("10")),
+            0);
+
+  // Numerics should still compare equal even if they have different types.
+  ASSERT_EQ(compare::SqlValue(SqlValue::Long(10), SqlValue::Double(10.0)), 0);
+}
+
+TEST(CompareTest, SqlValueLong) {
+  SqlValue int32_min = SqlValue::Long(std::numeric_limits<int32_t>::min());
+  SqlValue minus_1 = SqlValue::Long(-1);
+  SqlValue zero = SqlValue::Long(0);
+  SqlValue uint32_max = SqlValue::Long(std::numeric_limits<uint32_t>::max());
+
+  ASSERT_LT(compare::SqlValue(int32_min, minus_1), 0);
+  ASSERT_LT(compare::SqlValue(int32_min, uint32_max), 0);
+  ASSERT_LT(compare::SqlValue(minus_1, uint32_max), 0);
+
+  ASSERT_GT(compare::SqlValue(uint32_max, zero), 0);
+
+  ASSERT_EQ(compare::SqlValue(zero, zero), 0);
+}
+
+TEST(CompareTest, SqlValueDouble) {
+  SqlValue int32_min = SqlValue::Double(std::numeric_limits<int32_t>::min());
+  SqlValue minus_1 = SqlValue::Double(-1.0);
+  SqlValue zero = SqlValue::Double(0);
+  SqlValue uint32_max = SqlValue::Double(std::numeric_limits<uint32_t>::max());
+
+  ASSERT_LT(compare::SqlValue(int32_min, minus_1), 0);
+  ASSERT_LT(compare::SqlValue(int32_min, uint32_max), 0);
+  ASSERT_LT(compare::SqlValue(minus_1, uint32_max), 0);
+
+  ASSERT_GT(compare::SqlValue(uint32_max, zero), 0);
+
+  ASSERT_EQ(compare::SqlValue(zero, zero), 0);
+}
+
+TEST(CompareTest, SqlValueString) {
+  SqlValue a = SqlValue::String("a");
+  SqlValue aa = SqlValue::String("aa");
+  SqlValue b = SqlValue::String("b");
+
+  ASSERT_LT(compare::SqlValue(a, aa), 0);
+  ASSERT_LT(compare::SqlValue(aa, b), 0);
+  ASSERT_LT(compare::SqlValue(a, b), 0);
+
+  ASSERT_GT(compare::SqlValue(aa, a), 0);
+
+  ASSERT_EQ(compare::SqlValue(a, a), 0);
+  ASSERT_EQ(compare::SqlValue(aa, SqlValue::String("aa")), 0);
+}
+
+}  // namespace
+}  // namespace trace_processor
+}  // namespace perfetto