trace_processor: fix sorting of events in sched tracker

Problem:
Sched and counter events arrive in order at the sched tracker but exit
out of order when placed in the trace storage. We make assumptions that
the data in trace storage is always sorted and this assumption does not
hold if sched tracker does not output events as it receives them.

Reason:
Computation of duration and other parameters which require both the begin
and end events mean that until now, we were waiting for the end event
before emitting the slice/counter. However, ends may happen out of order
with respect to the start timestamp of the slice.

Solution:
Place the data in trace storage as soon as it received and backfill data
which cannot be computed straight away. Also switch from prev_comm to
next_comm to reduce backfill data for sched events.

Change-Id: Icecb4e409b6904a6f0d6dd046e9fbf0028968aa8
diff --git a/src/trace_processor/counters_table_unittest.cc b/src/trace_processor/counters_table_unittest.cc
index 211cb78..dc2ff0a 100644
--- a/src/trace_processor/counters_table_unittest.cc
+++ b/src/trace_processor/counters_table_unittest.cc
@@ -62,12 +62,16 @@
 TEST_F(CountersTableUnittest, SelectWhereCpu) {
   uint64_t timestamp = 1000;
   uint32_t freq = 3000;
+
   context_.storage->mutable_counters()->AddCounter(
-      timestamp, 0, 1, freq, 0, 1 /* cpu */, RefType::kCPU_ID);
+      timestamp, 0 /* dur */, 1, freq, 0 /* value delta */, 1 /* cpu */,
+      RefType::kCPU_ID);
   context_.storage->mutable_counters()->AddCounter(
-      timestamp + 1, 1, 1, freq + 1000, 1000, 1 /* cpu */, RefType::kCPU_ID);
+      timestamp + 1, 1 /* dur */, 1, freq + 1000, 1000 /* value delta */,
+      1 /* cpu */, RefType::kCPU_ID);
   context_.storage->mutable_counters()->AddCounter(
-      timestamp + 2, 1, 1, freq + 2000, 1000, 2 /* cpu */, RefType::kCPU_ID);
+      timestamp + 2, 1 /* dur */, 1, freq + 2000, 1000 /* value delta */,
+      2 /* cpu */, RefType::kCPU_ID);
 
   PrepareValidStatement("SELECT ts, dur, value FROM counters where ref = 1");
 
@@ -88,6 +92,7 @@
   uint64_t timestamp = 1000;
   uint32_t freq = 3000;
   uint32_t name_id = 1;
+
   context_.storage->mutable_counters()->AddCounter(
       timestamp, 1 /* dur */, name_id, freq, 0 /* value delta */, 1 /* cpu */,
       RefType::kCPU_ID);
diff --git a/src/trace_processor/process_tracker_unittest.cc b/src/trace_processor/process_tracker_unittest.cc
index 7ebeb5b..eac2bab 100644
--- a/src/trace_processor/process_tracker_unittest.cc
+++ b/src/trace_processor/process_tracker_unittest.cc
@@ -78,12 +78,9 @@
   static const char kCommProc2[] = "process2";
 
   context.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/1, prev_state,
-                                         kCommProc1,
-                                         /*tid=*/4);
+                                         /*tid=*/4, kCommProc1);
   context.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, /*tid=*/4,
-                                         prev_state, kCommProc2,
-
-                                         /*tid=*/1);
+                                         prev_state, /*tid=*/1, kCommProc2);
 
   context.process_tracker->UpdateProcess(2, "test");
   context.process_tracker->UpdateThread(4, 2);
diff --git a/src/trace_processor/proto_trace_parser.cc b/src/trace_processor/proto_trace_parser.cc
index c82982a..1cb2086 100644
--- a/src/trace_processor/proto_trace_parser.cc
+++ b/src/trace_processor/proto_trace_parser.cc
@@ -465,7 +465,7 @@
 
   uint32_t prev_pid = 0;
   uint32_t prev_state = 0;
-  base::StringView prev_comm;
+  base::StringView next_comm;
   uint32_t next_pid = 0;
   for (auto fld = decoder.ReadField(); fld.id != 0; fld = decoder.ReadField()) {
     switch (fld.id) {
@@ -475,18 +475,18 @@
       case protos::SchedSwitchFtraceEvent::kPrevStateFieldNumber:
         prev_state = fld.as_uint32();
         break;
-      case protos::SchedSwitchFtraceEvent::kPrevCommFieldNumber:
-        prev_comm = fld.as_string();
-        break;
       case protos::SchedSwitchFtraceEvent::kNextPidFieldNumber:
         next_pid = fld.as_uint32();
         break;
+      case protos::SchedSwitchFtraceEvent::kNextCommFieldNumber:
+        next_comm = fld.as_string();
+        break;
       default:
         break;
     }
   }
   context_->sched_tracker->PushSchedSwitch(cpu, timestamp, prev_pid, prev_state,
-                                           prev_comm, next_pid);
+                                           next_pid, next_comm);
   PERFETTO_DCHECK(decoder.IsEndOfBuffer());
 }
 
diff --git a/src/trace_processor/proto_trace_parser_unittest.cc b/src/trace_processor/proto_trace_parser_unittest.cc
index e502da5..1e6c9b6 100644
--- a/src/trace_processor/proto_trace_parser_unittest.cc
+++ b/src/trace_processor/proto_trace_parser_unittest.cc
@@ -48,8 +48,8 @@
                     uint64_t timestamp,
                     uint32_t prev_pid,
                     uint32_t prev_state,
-                    base::StringView prev_comm,
-                    uint32_t next_pid));
+                    uint32_t next_pid,
+                    base::StringView next_comm));
 
   MOCK_METHOD5(PushCounter,
                void(uint64_t timestamp,
@@ -119,11 +119,11 @@
   auto* sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(10);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName);
+  sched_switch->set_next_comm(kProcName);
   sched_switch->set_next_pid(100);
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32,
-                                       base::StringView(kProcName), 100));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32, 100,
+                                       base::StringView(kProcName)));
   Tokenize(trace);
 }
 
@@ -140,7 +140,7 @@
   auto* sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(10);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName1);
+  sched_switch->set_next_comm(kProcName1);
   sched_switch->set_next_pid(100);
 
   event = bundle->add_event();
@@ -150,14 +150,14 @@
   sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(100);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName2);
+  sched_switch->set_next_comm(kProcName2);
   sched_switch->set_next_pid(10);
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32,
-                                       base::StringView(kProcName1), 100));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32, 100,
+                                       base::StringView(kProcName1)));
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32,
-                                       base::StringView(kProcName2), 10));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32, 10,
+                                       base::StringView(kProcName2)));
 
   Tokenize(trace);
 }
@@ -175,7 +175,7 @@
   auto* sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(10);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName1);
+  sched_switch->set_next_comm(kProcName1);
   sched_switch->set_next_pid(100);
 
   bundle = trace.add_packet()->mutable_ftrace_events();
@@ -188,14 +188,14 @@
   sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(100);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName2);
+  sched_switch->set_next_comm(kProcName2);
   sched_switch->set_next_pid(10);
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32,
-                                       base::StringView(kProcName1), 100));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32, 100,
+                                       base::StringView(kProcName1)));
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32,
-                                       base::StringView(kProcName2), 10));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32, 10,
+                                       base::StringView(kProcName2)));
   Tokenize(trace);
 }
 
@@ -209,7 +209,7 @@
   auto* sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(10);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName1);
+  sched_switch->set_next_comm(kProcName1);
   sched_switch->set_next_pid(100);
 
   protos::Trace trace_2;
@@ -221,15 +221,15 @@
   sched_switch = event->mutable_sched_switch();
   sched_switch->set_prev_pid(100);
   sched_switch->set_prev_state(32);
-  sched_switch->set_prev_comm(kProcName2);
+  sched_switch->set_next_comm(kProcName2);
   sched_switch->set_next_pid(10);
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32,
-                                       base::StringView(kProcName1), 100));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1000, 10, 32, 100,
+                                       base::StringView(kProcName1)));
   Tokenize(trace_1);
 
-  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32,
-                                       base::StringView(kProcName2), 10));
+  EXPECT_CALL(*sched_, PushSchedSwitch(10, 1001, 100, 32, 10,
+                                       base::StringView(kProcName2)));
   Tokenize(trace_2);
 }
 
diff --git a/src/trace_processor/sched_slice_table_unittest.cc b/src/trace_processor/sched_slice_table_unittest.cc
index 1f3cc18..57b0a95 100644
--- a/src/trace_processor/sched_slice_table_unittest.cc
+++ b/src/trace_processor/sched_slice_table_unittest.cc
@@ -70,15 +70,16 @@
   static const char kCommProc2[] = "process2";
   uint32_t pid_2 = 4;
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 3, pid_2, prev_state,
-                                          kCommProc2, pid_1);
+                                          pid_1, kCommProc2);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 4, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 10, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
 
-  PrepareValidStatement("SELECT dur, ts, cpu FROM sched ORDER BY dur");
+  PrepareValidStatement(
+      "SELECT dur, ts, cpu FROM sched where dur != 0 ORDER BY dur");
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
   ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 1 /* duration */);
@@ -109,19 +110,20 @@
   static const char kCommProc2[] = "process2";
   uint32_t pid_2 = 4;
   context_.sched_tracker->PushSchedSwitch(cpu_3, timestamp - 2, pid_1,
-                                          prev_state, kCommProc1, pid_2);
+                                          prev_state, pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu_3, timestamp - 1, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
   context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 3, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
   context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 4, pid_1,
-                                          prev_state, kCommProc1, pid_2);
+                                          prev_state, pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 10, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
 
-  PrepareValidStatement("SELECT dur, ts, cpu FROM sched ORDER BY dur desc");
+  PrepareValidStatement(
+      "SELECT dur, ts, cpu FROM sched where dur != 0 ORDER BY dur desc");
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
   ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 7 /* duration */);
@@ -151,15 +153,16 @@
   static const char kCommProc2[] = "process2";
   uint32_t pid_2 = 4;
   context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 3, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
   context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 4, pid_1,
-                                          prev_state, kCommProc1, pid_2);
+                                          prev_state, pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 10, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
 
-  PrepareValidStatement("SELECT dur, ts, cpu FROM sched WHERE cpu = 3");
+  PrepareValidStatement(
+      "SELECT dur, ts, cpu FROM sched WHERE dur != 0 and cpu = 3");
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
   ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 4 /* duration */);
@@ -178,15 +181,15 @@
   static const char kCommProc2[] = "process2";
   uint32_t pid_2 = 4;
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 3, pid_2, prev_state,
-                                          kCommProc2, pid_1);
+                                          pid_1, kCommProc2);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 4, pid_1, prev_state,
-                                          kCommProc1, pid_2);
+                                          pid_2, kCommProc1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 10, pid_2,
-                                          prev_state, kCommProc2, pid_1);
+                                          prev_state, pid_1, kCommProc2);
 
-  PrepareValidStatement("SELECT utid FROM sched ORDER BY utid");
+  PrepareValidStatement("SELECT utid FROM sched where dur != 0 ORDER BY utid");
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
   ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 1 /* duration */);
@@ -211,15 +214,16 @@
   // respectively, @ T=50 and T=70.
   for (uint64_t i = 0; i <= 11; i++) {
     context_.sched_tracker->PushSchedSwitch(cpu_5, 50 + i, pid_1, prev_state,
-                                            "pid_1", pid_1);
+                                            pid_1, "pid_1");
   }
   for (uint64_t i = 0; i <= 11; i++) {
     context_.sched_tracker->PushSchedSwitch(cpu_7, 70 + i, pid_2, prev_state,
-                                            "pid_2", pid_2);
+                                            pid_2, "pid_2");
   }
 
   auto query = [this](const std::string& where_clauses) {
-    PrepareValidStatement("SELECT ts from sched WHERE " + where_clauses);
+    PrepareValidStatement("SELECT ts from sched WHERE dur != 0 and " +
+                          where_clauses);
     std::vector<int> res;
     while (sqlite3_step(*stmt_) == SQLITE_ROW) {
       res.push_back(sqlite3_column_int(*stmt_, 0));
diff --git a/src/trace_processor/sched_tracker.cc b/src/trace_processor/sched_tracker.cc
index 4dcc097..67071a0 100644
--- a/src/trace_processor/sched_tracker.cc
+++ b/src/trace_processor/sched_tracker.cc
@@ -30,12 +30,16 @@
 
 SchedTracker::~SchedTracker() = default;
 
+StringId SchedTracker::GetThreadNameId(uint32_t tid, base::StringView comm) {
+  return tid == 0 ? idle_string_id_ : context_->storage->InternString(comm);
+}
+
 void SchedTracker::PushSchedSwitch(uint32_t cpu,
                                    uint64_t timestamp,
                                    uint32_t prev_pid,
-                                   uint32_t prev_state,
-                                   base::StringView prev_comm,
-                                   uint32_t next_pid) {
+                                   uint32_t,
+                                   uint32_t next_pid,
+                                   base::StringView next_comm) {
   // At this stage all events should be globally timestamp ordered.
   if (timestamp < prev_timestamp_) {
     PERFETTO_ELOG("sched_switch event out of order by %.4f ms, skipping",
@@ -44,31 +48,29 @@
   }
   prev_timestamp_ = timestamp;
   PERFETTO_DCHECK(cpu < base::kMaxCpus);
-  SchedSwitchEvent* prev = &last_sched_per_cpu_[cpu];
-  // If we had a valid previous event, then inform the storage about the
-  // slice.
-  if (prev->valid()) {
-    uint64_t duration = timestamp - prev->timestamp;
-    StringId prev_thread_name_id =
-        prev->next_pid == 0 ? idle_string_id_
-                            : context_->storage->InternString(prev_comm);
-    UniqueTid utid = context_->process_tracker->UpdateThread(
-        prev->timestamp, prev->next_pid /* == prev_pid */, prev_thread_name_id);
-    context_->storage->AddSliceToCpu(cpu, prev->timestamp, duration, utid);
+
+  auto* slices = context_->storage->mutable_slices();
+  auto* pending_slice = &pending_sched_per_cpu_[cpu];
+  if (pending_slice->storage_index < std::numeric_limits<size_t>::max()) {
+    // If the this events previous pid does not match the previous event's next
+    // pid, make a note of this.
+    if (prev_pid != pending_slice->pid) {
+      context_->storage->AddMismatchedSchedSwitch();
+    }
+
+    size_t idx = pending_slice->storage_index;
+    uint64_t duration = timestamp - slices->start_ns()[idx];
+    slices->set_duration(idx, duration);
   }
 
-  // If the this events previous pid does not match the previous event's next
-  // pid, make a note of this.
-  if (prev_pid != prev->next_pid) {
-    context_->storage->AddMismatchedSchedSwitch();
-  }
+  StringId name_id = GetThreadNameId(next_pid, next_comm);
+  auto utid =
+      context_->process_tracker->UpdateThread(timestamp, next_pid, name_id);
 
-  // Update the map with the current event.
-  prev->timestamp = timestamp;
-  prev->prev_pid = prev_pid;
-  prev->prev_state = prev_state;
-  prev->next_pid = next_pid;
-};
+  pending_slice->storage_index =
+      slices->AddSlice(cpu, timestamp, 0 /* duration */, utid);
+  pending_slice->pid = next_pid;
+}
 
 void SchedTracker::PushCounter(uint64_t timestamp,
                                double value,
@@ -82,24 +84,22 @@
   }
   prev_timestamp_ = timestamp;
 
-  // The previous counter with the same ref and name_id.
-  Counter& prev = prev_counters_[CounterKey{ref, name_id}];
+  auto* counters = context_->storage->mutable_counters();
+  const auto& key = CounterKey{ref, name_id};
+  auto counter_it = pending_counters_per_key_.find(key);
+  if (counter_it != pending_counters_per_key_.end()) {
+    size_t idx = counter_it->second;
 
-  uint64_t duration = 0;
-  double value_delta = 0;
-
-  if (prev.timestamp != 0) {
-    duration = timestamp - prev.timestamp;
-    value_delta = value - prev.value;
-
-    context_->storage->mutable_counters()->AddCounter(
-        prev.timestamp, duration, name_id, prev.value, value_delta,
-        static_cast<int64_t>(ref), ref_type);
+    uint64_t duration = timestamp - counters->timestamps()[idx];
+    double value_delta = value - counters->values()[idx];
+    counters->set_duration(idx, duration);
+    counters->set_value_delta(idx, value_delta);
   }
 
-  prev.timestamp = timestamp;
-  prev.value = value;
-};
+  pending_counters_per_key_[key] = counters->AddCounter(
+      timestamp, 0 /* duration */, name_id, value, 0 /* value_delta */,
+      static_cast<int64_t>(ref), ref_type);
+}
 
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/sched_tracker.h b/src/trace_processor/sched_tracker.h
index b2c5791..3679240 100644
--- a/src/trace_processor/sched_tracker.h
+++ b/src/trace_processor/sched_tracker.h
@@ -18,6 +18,7 @@
 #define SRC_TRACE_PROCESSOR_SCHED_TRACKER_H_
 
 #include <array>
+#include <limits>
 
 #include "perfetto/base/string_view.h"
 #include "perfetto/base/utils.h"
@@ -37,23 +38,25 @@
   SchedTracker& operator=(const SchedTracker&) = delete;
   virtual ~SchedTracker();
 
-  struct SchedSwitchEvent {
-    uint64_t timestamp = 0;
-    uint32_t prev_pid = 0;
-    uint32_t prev_state = 0;
-    uint32_t next_pid = 0;
+  StringId GetThreadNameId(uint32_t tid, base::StringView comm);
 
-    bool valid() const { return timestamp != 0; }
-  };
+  // This method is called when a sched switch event is seen in the trace.
+  virtual void PushSchedSwitch(uint32_t cpu,
+                               uint64_t timestamp,
+                               uint32_t prev_pid,
+                               uint32_t prev_state,
+                               uint32_t next_pid,
+                               base::StringView next_comm);
 
-  // A Counter is a trace event that has a value attached to a timestamp.
-  // These include CPU frequency ftrace events and systrace trace_marker
-  // counter events.
-  struct Counter {
-    uint64_t timestamp = 0;
-    double value = 0;
-  };
+  // This method is called when a cpu freq event is seen in the trace.
+  // TODO(taylori): Move to a more appropriate class or rename class.
+  virtual void PushCounter(uint64_t timestamp,
+                           double value,
+                           StringId name_id,
+                           uint64_t ref,
+                           RefType ref_type);
 
+ private:
   // Used as the key in |prev_counters_| to find the previous counter with the
   // same ref and name_id.
   struct CounterKey {
@@ -73,29 +76,18 @@
     };
   };
 
-  // This method is called when a sched switch event is seen in the trace.
-  virtual void PushSchedSwitch(uint32_t cpu,
-                               uint64_t timestamp,
-                               uint32_t prev_pid,
-                               uint32_t prev_state,
-                               base::StringView prev_comm,
-                               uint32_t next_pid);
+  // Represents a slice which is currently pending.
+  struct PendingSchedSlice {
+    size_t storage_index = std::numeric_limits<size_t>::max();
+    uint32_t pid = 0;
+  };
 
-  // This method is called when a cpu freq event is seen in the trace.
-  // TODO(taylori): Move to a more appropriate class or rename class.
-  virtual void PushCounter(uint64_t timestamp,
-                           double value,
-                           StringId name_id,
-                           uint64_t ref,
-                           RefType ref_type);
+  // Store pending sched slices for each CPU.
+  std::array<PendingSchedSlice, base::kMaxCpus> pending_sched_per_cpu_{};
 
- private:
-  // Store the previous sched event to calculate the duration before storing it.
-  std::array<SchedSwitchEvent, base::kMaxCpus> last_sched_per_cpu_;
-
-  // Store the previous counter event to calculate the duration and value delta
-  // before storing it in trace storage.
-  std::unordered_map<CounterKey, Counter, CounterKey::Hasher> prev_counters_;
+  // Store pending counters for each counter key.
+  std::unordered_map<CounterKey, size_t, CounterKey::Hasher>
+      pending_counters_per_key_;
 
   // Timestamp of the previous event. Used to discard events arriving out
   // of order.
diff --git a/src/trace_processor/sched_tracker_unittest.cc b/src/trace_processor/sched_tracker_unittest.cc
index a8a70c3..863052e 100644
--- a/src/trace_processor/sched_tracker_unittest.cc
+++ b/src/trace_processor/sched_tracker_unittest.cc
@@ -51,18 +51,18 @@
 
   const auto& timestamps = context.storage->slices().start_ns();
   context.sched_tracker->PushSchedSwitch(cpu, timestamp, pid_1, prev_state,
-                                         kCommProc1, pid_2);
-  ASSERT_EQ(timestamps.size(), 0);
+                                         pid_2, kCommProc1);
+  ASSERT_EQ(timestamps.size(), 1);
 
   context.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, pid_2, prev_state,
-                                         kCommProc2, pid_1);
+                                         pid_1, kCommProc2);
 
-  ASSERT_EQ(timestamps.size(), 1ul);
+  ASSERT_EQ(timestamps.size(), 2ul);
   ASSERT_EQ(timestamps[0], timestamp);
   ASSERT_EQ(context.storage->GetThread(1).start_ns, timestamp);
   ASSERT_EQ(std::string(context.storage->GetString(
                 context.storage->GetThread(1).name_id)),
-            kCommProc2);
+            kCommProc1);
   ASSERT_EQ(context.storage->slices().utids().front(), 1);
 }
 
@@ -75,21 +75,20 @@
 
   const auto& timestamps = context.storage->slices().start_ns();
   context.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/4, prev_state,
-                                         kCommProc1,
-                                         /*tid=*/2);
-  ASSERT_EQ(timestamps.size(), 0);
+                                         /*tid=*/2, kCommProc1);
+  ASSERT_EQ(timestamps.size(), 1);
 
   context.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, /*tid=*/2,
-                                         prev_state, kCommProc1,
-                                         /*tid=*/4);
+                                         prev_state,
+                                         /*tid=*/4, kCommProc1);
   context.sched_tracker->PushSchedSwitch(cpu, timestamp + 11, /*tid=*/4,
-                                         prev_state, kCommProc2,
-                                         /*tid=*/2);
+                                         prev_state,
+                                         /*tid=*/2, kCommProc2);
   context.sched_tracker->PushSchedSwitch(cpu, timestamp + 31, /*tid=*/4,
-                                         prev_state, kCommProc1,
-                                         /*tid=*/2);
+                                         prev_state,
+                                         /*tid=*/2, kCommProc1);
 
-  ASSERT_EQ(timestamps.size(), 3ul);
+  ASSERT_EQ(timestamps.size(), 4ul);
   ASSERT_EQ(timestamps[0], timestamp);
   ASSERT_EQ(context.storage->GetThread(1).start_ns, timestamp);
   ASSERT_EQ(context.storage->slices().durations().at(0), 1u);
@@ -112,7 +111,7 @@
   context.sched_tracker->PushCounter(timestamp + 9, 1000, name_id, cpu,
                                      RefType::kCPU_ID);
 
-  ASSERT_EQ(context.storage->counters().counter_count(), 3ul);
+  ASSERT_EQ(context.storage->counters().counter_count(), 4ul);
   ASSERT_EQ(context.storage->counters().timestamps().at(0), timestamp);
   ASSERT_EQ(context.storage->counters().durations().at(0), 1);
   ASSERT_EQ(context.storage->counters().values().at(0), 1000);
@@ -141,7 +140,7 @@
   context.sched_tracker->PushCounter(timestamp + 9, 1, name_id_upid, upid,
                                      RefType::kUTID);
 
-  ASSERT_EQ(context.storage->counters().counter_count(), 2ul);
+  ASSERT_EQ(context.storage->counters().counter_count(), 4ul);
   ASSERT_EQ(context.storage->counters().timestamps().at(0), timestamp);
   ASSERT_EQ(context.storage->counters().durations().at(0), 3);
   ASSERT_EQ(context.storage->counters().values().at(0), 1000);
diff --git a/src/trace_processor/thread_table_unittest.cc b/src/trace_processor/thread_table_unittest.cc
index 49c0efc..10e1622 100644
--- a/src/trace_processor/thread_table_unittest.cc
+++ b/src/trace_processor/thread_table_unittest.cc
@@ -71,9 +71,9 @@
   static const char kThreadName2[] = "thread2";
 
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/1, prev_state,
-                                          kThreadName1, /*tid=*/4);
+                                          /*tid=*/4, kThreadName1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, /*tid=*/4,
-                                          prev_state, kThreadName2, /*tid=*/1);
+                                          prev_state, /*tid=*/1, kThreadName2);
 
   context_.process_tracker->UpdateProcess(2, "test");
   context_.process_tracker->UpdateThread(4 /*tid*/, 2 /*pid*/);
@@ -83,7 +83,7 @@
   ASSERT_EQ(sqlite3_column_int(*stmt_, 0), 1 /* utid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 1), 1 /* upid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 2), 4 /* tid */);
-  ASSERT_STREQ(GetColumnAsText(3), kThreadName2);
+  ASSERT_STREQ(GetColumnAsText(3), kThreadName1);
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
 }
@@ -96,13 +96,12 @@
   static const char kThreadName2[] = "thread2";
 
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/1, prev_state,
-                                          kThreadName1,
-                                          /*tid=*/4);
+                                          /*tid=*/4, kThreadName1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, /*tid=*/4,
-                                          prev_state, kThreadName2,
-                                          /*tid=*/1);
+                                          prev_state,
+                                          /*tid=*/1, kThreadName2);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 2, /*tid=*/1,
-                                          prev_state, kThreadName1, /*tid=*/4);
+                                          prev_state, /*tid=*/4, kThreadName1);
 
   context_.process_tracker->UpdateProcess(2, "test");
   context_.process_tracker->UpdateThread(4 /*tid*/, 2 /*pid*/);
@@ -114,7 +113,7 @@
   ASSERT_EQ(sqlite3_column_int(*stmt_, 0), 1 /* utid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 1), 1 /* upid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 2), 4 /* tid */);
-  ASSERT_STREQ(GetColumnAsText(3), kThreadName2);
+  ASSERT_STREQ(GetColumnAsText(3), kThreadName1);
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
 }
@@ -127,13 +126,10 @@
   static const char kThreadName2[] = "thread2";
 
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/1, prev_state,
-                                          kThreadName1,
-
-                                          /*tid=*/4);
+                                          /*tid=*/4, kThreadName1);
   context_.sched_tracker->PushSchedSwitch(cpu, timestamp + 1, /*tid=*/4,
-                                          prev_state, kThreadName2,
-
-                                          /*tid=*/1);
+                                          prev_state,
+                                          /*tid=*/1, kThreadName2);
 
   // Also create a process for which we haven't seen any thread.
   context_.process_tracker->UpdateProcess(7, "pid7");
@@ -166,7 +162,7 @@
   ASSERT_EQ(sqlite3_column_int(*stmt_, 1), 4 /* tid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 4), 2 /* pid */);
   ASSERT_EQ(sqlite3_column_int(*stmt_, 3), 2 /* upid */);
-  ASSERT_STREQ(GetColumnAsText(2), kThreadName2);
+  ASSERT_STREQ(GetColumnAsText(2), kThreadName1);
   ASSERT_STREQ(GetColumnAsText(5), "pid2");
 
   ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
diff --git a/src/trace_processor/trace_database_integrationtest.cc b/src/trace_processor/trace_database_integrationtest.cc
index f191161..d6bf701 100644
--- a/src/trace_processor/trace_database_integrationtest.cc
+++ b/src/trace_processor/trace_database_integrationtest.cc
@@ -66,7 +66,10 @@
 TEST_F(TraceProcessorIntegrationTest, AndroidSchedAndPs) {
   ASSERT_TRUE(LoadTrace("android_sched_and_ps.pb"));
   protos::RawQueryResult res;
-  Query("select count(*), max(ts) - min(ts) from sched where utid != 0", &res);
+  Query(
+      "select count(*), max(ts) - min(ts) from sched "
+      "where dur != 0 and utid != 0",
+      &res);
   ASSERT_EQ(res.num_records(), 1);
   ASSERT_EQ(res.columns(0).long_values(0), 139789);
   ASSERT_EQ(res.columns(1).long_values(0), 19684308497);
diff --git a/src/trace_processor/trace_storage.cc b/src/trace_processor/trace_storage.cc
index 4443af0..e219c67 100644
--- a/src/trace_processor/trace_storage.cc
+++ b/src/trace_processor/trace_storage.cc
@@ -33,13 +33,6 @@
 
 TraceStorage::~TraceStorage() {}
 
-void TraceStorage::AddSliceToCpu(uint32_t cpu,
-                                 uint64_t start_ns,
-                                 uint64_t duration_ns,
-                                 UniqueTid utid) {
-  slices_.AddSlice(cpu, start_ns, duration_ns, utid);
-}
-
 StringId TraceStorage::InternString(base::StringView str) {
   auto hash = str.Hash();
   auto id_it = string_index_.find(hash);
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index 9a86cf4..eef8094 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -80,14 +80,19 @@
 
   class Slices {
    public:
-    inline void AddSlice(uint32_t cpu,
-                         uint64_t start_ns,
-                         uint64_t duration_ns,
-                         UniqueTid utid) {
+    inline size_t AddSlice(uint32_t cpu,
+                           uint64_t start_ns,
+                           uint64_t duration_ns,
+                           UniqueTid utid) {
       cpus_.emplace_back(cpu);
       start_ns_.emplace_back(start_ns);
       durations_.emplace_back(duration_ns);
       utids_.emplace_back(utid);
+      return slice_count() - 1;
+    }
+
+    void set_duration(size_t index, uint64_t duration_ns) {
+      durations_[index] = duration_ns;
     }
 
     size_t slice_count() const { return start_ns_.size(); }
@@ -154,13 +159,13 @@
 
   class Counters {
    public:
-    inline void AddCounter(uint64_t timestamp,
-                           uint64_t duration,
-                           StringId name_id,
-                           double value,
-                           double value_delta,
-                           int64_t ref,
-                           RefType type) {
+    inline size_t AddCounter(uint64_t timestamp,
+                             uint64_t duration,
+                             StringId name_id,
+                             double value,
+                             double value_delta,
+                             int64_t ref,
+                             RefType type) {
       timestamps_.emplace_back(timestamp);
       durations_.emplace_back(duration);
       name_ids_.emplace_back(name_id);
@@ -168,7 +173,17 @@
       value_deltas_.emplace_back(value_delta);
       refs_.emplace_back(ref);
       types_.emplace_back(type);
+      return counter_count() - 1;
     }
+
+    void set_duration(size_t index, uint64_t duration) {
+      durations_[index] = duration;
+    }
+
+    void set_value_delta(size_t index, double value_delta) {
+      value_deltas_[index] = value_delta;
+    }
+
     size_t counter_count() const { return timestamps_.size(); }
 
     const std::deque<uint64_t>& timestamps() const { return timestamps_; }
@@ -197,11 +212,6 @@
 
   void ResetStorage();
 
-  void AddSliceToCpu(uint32_t cpu,
-                     uint64_t start_ns,
-                     uint64_t duration_ns,
-                     UniqueTid utid);
-
   UniqueTid AddEmptyThread(uint32_t tid) {
     unique_threads_.emplace_back(tid);
     return static_cast<UniqueTid>(unique_threads_.size() - 1);
@@ -247,6 +257,8 @@
   }
 
   const Slices& slices() const { return slices_; }
+  Slices* mutable_slices() { return &slices_; }
+
   const NestableSlices& nestable_slices() const { return nestable_slices_; }
   NestableSlices* mutable_nestable_slices() { return &nestable_slices_; }