trace_processor: overhaul and reenable args and raw tables

This CL reintroduces the args table but with args deduped by "arg set".

An arg set is a collection of args which appear together. Since it is
very common for a group of args to appear together with the same values.
For example, the args for sched_switch where swapper -> process A or
the extreme case where regardless of how many CPU idle events there are,
there will only be 32 possible pairs of (idle state, cpu) for a 8 CPU
machine.

Thus, for each arg set we create, we can hash it, see if it's already
in the args table and if so we just refer to that existing set.

The logic for this is done by a newly introduced class ArgTracker which
buffers args being added for each row of another table and flushes to
storage at the end of the packet.

We also need to add columns to raw and counters tables to allow them to
reference the arg set in the args table.

Bug: 122513680
Change-Id: I1e99fa9eda3054b16ca48f0aaf03dde47fbbbdce
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index 2c64371..e19b7b0 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -60,9 +60,11 @@
 // The top 8 bits are set to the TableId and the bottom 32 to the row of the
 // table.
 using RowId = int64_t;
-
 static const RowId kInvalidRowId = 0;
 
+using ArgSetId = uint32_t;
+static const ArgSetId kInvalidArgSetId = 0;
+
 enum RefType {
   kRefNoRef = 0,
   kRefUtid = 1,
@@ -139,40 +141,84 @@
       };
     };
 
-    const std::deque<RowId>& ids() const { return ids_; }
+    struct Arg {
+      StringId flat_key = 0;
+      StringId key = 0;
+      Variadic value = Variadic::Integer(0);
+
+      // This is only used by the arg tracker and so is not part of the hash.
+      RowId row_id = 0;
+    };
+
+    struct ArgHasher {
+      uint64_t operator()(const Arg& arg) const noexcept {
+        uint64_t hash = kFnv1a64OffsetBasis;
+        hash ^= static_cast<decltype(hash)>(arg.key);
+        hash *= 1099511628211;  // FNV-1a-64 prime.
+        // We don't hash arg.flat_key because it's a subsequence of arg.key.
+        switch (arg.value.type) {
+          case Variadic::Type::kInt:
+            hash ^= static_cast<uint64_t>(arg.value.int_value);
+            break;
+          case Variadic::Type::kString:
+            hash ^= static_cast<uint64_t>(arg.value.string_value);
+            break;
+          case Variadic::Type::kReal:
+            hash ^= static_cast<uint64_t>(arg.value.real_value);
+            break;
+        }
+        hash *= kFnv1a64Prime;
+        return hash;
+      }
+    };
+
+    const std::deque<ArgSetId>& set_ids() const { return set_ids_; }
     const std::deque<StringId>& flat_keys() const { return flat_keys_; }
     const std::deque<StringId>& keys() const { return keys_; }
     const std::deque<Variadic>& arg_values() const { return arg_values_; }
-    const std::multimap<RowId, uint32_t>& args_for_id() const {
-      return args_for_id_;
+    uint32_t args_count() const {
+      return static_cast<uint32_t>(set_ids_.size());
     }
-    size_t args_count() const { return ids_.size(); }
 
-    void AddArg(RowId id, StringId flat_key, StringId key, Variadic value) {
-      // TODO(b/123252504): disable this code to stop blow-ups in ingestion time
-      // and memory.
-      perfetto::base::ignore_result(id);
-      perfetto::base::ignore_result(flat_key);
-      perfetto::base::ignore_result(key);
-      perfetto::base::ignore_result(value);
-      /*
-      if (id == kInvalidRowId)
-        return;
+    ArgSetId AddArgSet(const std::vector<Arg>& args,
+                       uint32_t begin,
+                       uint32_t end) {
+      ArgSetHash hash = kFnv1a64OffsetBasis;
+      for (uint32_t i = begin; i < end; i++) {
+        hash ^= ArgHasher()(args[i]);
+        hash *= kFnv1a64Prime;
+      }
 
-      ids_.emplace_back(id);
-      flat_keys_.emplace_back(flat_key);
-      keys_.emplace_back(key);
-      arg_values_.emplace_back(value);
-      args_for_id_.emplace(id, static_cast<uint32_t>(args_count() - 1));
-      */
+      auto it = arg_row_for_hash_.find(hash);
+      if (it != arg_row_for_hash_.end()) {
+        return set_ids_[it->second];
+      }
+
+      // The +1 ensures that nothing has an id == kInvalidArgSetId == 0.
+      ArgSetId id = static_cast<uint32_t>(arg_row_for_hash_.size()) + 1;
+      arg_row_for_hash_.emplace(hash, args_count());
+      for (uint32_t i = begin; i < end; i++) {
+        const auto& arg = args[i];
+        set_ids_.emplace_back(id);
+        flat_keys_.emplace_back(arg.flat_key);
+        keys_.emplace_back(arg.key);
+        arg_values_.emplace_back(arg.value);
+      }
+      return id;
     }
 
    private:
-    std::deque<RowId> ids_;
+    using ArgSetHash = uint64_t;
+
+    static constexpr uint64_t kFnv1a64OffsetBasis = 0xcbf29ce484222325;
+    static constexpr uint64_t kFnv1a64Prime = 0xcbf29ce484222325;
+
+    std::deque<ArgSetId> set_ids_;
     std::deque<StringId> flat_keys_;
     std::deque<StringId> keys_;
     std::deque<Variadic> arg_values_;
-    std::multimap<RowId, uint32_t> args_for_id_;
+
+    std::unordered_map<ArgSetHash, uint32_t> arg_row_for_hash_;
   };
 
   class Slices {
@@ -304,6 +350,7 @@
       values_.emplace_back(value);
       refs_.emplace_back(ref);
       types_.emplace_back(type);
+      arg_set_ids_.emplace_back(kInvalidArgSetId);
       return counter_count() - 1;
     }
 
@@ -311,6 +358,8 @@
       durations_[index] = duration;
     }
 
+    void set_arg_set_id(uint32_t row, ArgSetId id) { arg_set_ids_[row] = id; }
+
     size_t counter_count() const { return timestamps_.size(); }
 
     const std::deque<int64_t>& timestamps() const { return timestamps_; }
@@ -325,6 +374,8 @@
 
     const std::deque<RefType>& types() const { return types_; }
 
+    const std::deque<ArgSetId>& arg_set_ids() const { return arg_set_ids_; }
+
    private:
     std::deque<int64_t> timestamps_;
     std::deque<int64_t> durations_;
@@ -332,6 +383,7 @@
     std::deque<double> values_;
     std::deque<int64_t> refs_;
     std::deque<RefType> types_;
+    std::deque<ArgSetId> arg_set_ids_;
   };
 
   class SqlStats {
@@ -399,10 +451,13 @@
       name_ids_.emplace_back(name_id);
       cpus_.emplace_back(cpu);
       utids_.emplace_back(utid);
+      arg_set_ids_.emplace_back(kInvalidArgSetId);
       return CreateRowId(TableId::kRawEvents,
                          static_cast<uint32_t>(raw_event_count() - 1));
     }
 
+    void set_arg_set_id(uint32_t row, ArgSetId id) { arg_set_ids_[row] = id; }
+
     size_t raw_event_count() const { return timestamps_.size(); }
 
     const std::deque<int64_t>& timestamps() const { return timestamps_; }
@@ -413,11 +468,14 @@
 
     const std::deque<UniqueTid>& utids() const { return utids_; }
 
+    const std::deque<ArgSetId>& arg_set_ids() const { return arg_set_ids_; }
+
    private:
     std::deque<int64_t> timestamps_;
     std::deque<StringId> name_ids_;
     std::deque<uint32_t> cpus_;
     std::deque<UniqueTid> utids_;
+    std::deque<ArgSetId> arg_set_ids_;
   };
 
   class AndroidLogs {
@@ -524,10 +582,16 @@
   }
 
   static RowId CreateRowId(TableId table, uint32_t row) {
-    static constexpr uint8_t kRowIdTableShift = 32;
     return (static_cast<RowId>(table) << kRowIdTableShift) | row;
   }
 
+  static std::pair<int8_t /*table*/, uint32_t /*row*/> ParseRowId(RowId rowid) {
+    auto id = static_cast<uint64_t>(rowid);
+    auto table_id = static_cast<uint8_t>(id >> kRowIdTableShift);
+    auto row = static_cast<uint32_t>(id & ((1ull << kRowIdTableShift) - 1));
+    return std::make_pair(table_id, row);
+  }
+
   const Slices& slices() const { return slices_; }
   Slices* mutable_slices() { return &slices_; }
 
@@ -572,10 +636,12 @@
   std::pair<int64_t, int64_t> GetTraceTimestampBoundsNs() const;
 
  private:
-  TraceStorage& operator=(const TraceStorage&) = default;
+  static constexpr uint8_t kRowIdTableShift = 32;
 
   using StringHash = uint64_t;
 
+  TraceStorage& operator=(const TraceStorage&) = default;
+
   // Stats about parsing the trace.
   StatsMap stats_{};