trace_processor: change cursor class to be scoped to a filter operation

Before this point, our cursors were structured in the same way as SQLite
cursors. That is, a cursor object can be reused for multiple filter
operations.

However, in pretty much all our cursor cases, the state of the cursor is
tied to a filter operation and not the scope of the SQLite cursor.

Change the scoping by creating a RawCursor class at the Table level
which handles conversion of scoping between SQLite filters and creation
of our cursors.

Bonus: this clears up a lot of "FilterState" and similar classes which
were scoping to a filter operation in the all the sub classes.

This also allows for neater implementation of multiple algorithms for
traversing a table (e.g. upcoming multi algorithm span tables).

Change-Id: I5068a9868c2271b6ab4655f61fcee5659b17c328
diff --git a/src/trace_processor/counters_table.cc b/src/trace_processor/counters_table.cc
index df5723d..cd14cc4 100644
--- a/src/trace_processor/counters_table.cc
+++ b/src/trace_processor/counters_table.cc
@@ -50,7 +50,9 @@
       {Column::kName, Column::kTimestamp, Column::kRef});
 }
 
-std::unique_ptr<Table::Cursor> CountersTable::CreateCursor() {
+std::unique_ptr<Table::Cursor> CountersTable::CreateCursor(
+    const QueryConstraints&,
+    sqlite3_value**) {
   return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
 }
 
@@ -122,10 +124,6 @@
   return SQLITE_OK;
 }
 
-int CountersTable::Cursor::Filter(const QueryConstraints&, sqlite3_value**) {
-  return SQLITE_OK;
-}
-
 int CountersTable::Cursor::Next() {
   row_++;
   return SQLITE_OK;
diff --git a/src/trace_processor/counters_table.h b/src/trace_processor/counters_table.h
index 83e41d7..90929ec 100644
--- a/src/trace_processor/counters_table.h
+++ b/src/trace_processor/counters_table.h
@@ -44,7 +44,8 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
@@ -53,7 +54,6 @@
     Cursor(const TraceStorage*);
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;
diff --git a/src/trace_processor/process_table.cc b/src/trace_processor/process_table.cc
index fab28e7..97bc205 100644
--- a/src/trace_processor/process_table.cc
+++ b/src/trace_processor/process_table.cc
@@ -46,8 +46,10 @@
       {Column::kUpid});
 }
 
-std::unique_ptr<Table::Cursor> ProcessTable::CreateCursor() {
-  return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
+std::unique_ptr<Table::Cursor> ProcessTable::CreateCursor(
+    const QueryConstraints& qc,
+    sqlite3_value** argv) {
+  return std::unique_ptr<Table::Cursor>(new Cursor(storage_, qc, argv));
 }
 
 int ProcessTable::BestIndex(const QueryConstraints& qc, BestIndexInfo* info) {
@@ -63,23 +65,55 @@
   return SQLITE_OK;
 }
 
-ProcessTable::Cursor::Cursor(const TraceStorage* storage) : storage_(storage) {}
+ProcessTable::Cursor::Cursor(const TraceStorage* storage,
+                             const QueryConstraints& qc,
+                             sqlite3_value** argv)
+    : storage_(storage) {
+  min = 1;
+  max = static_cast<uint32_t>(storage_->process_count());
+  desc = false;
+  current = min;
+
+  for (size_t j = 0; j < qc.constraints().size(); j++) {
+    const auto& cs = qc.constraints()[j];
+    if (cs.iColumn == Column::kUpid) {
+      auto constraint_upid = static_cast<UniquePid>(sqlite3_value_int(argv[j]));
+      // Set the range of upids that we are interested in, based on the
+      // constraints in the query. Everything between min and max (inclusive)
+      // will be returned.
+      if (IsOpEq(cs.op)) {
+        min = constraint_upid;
+        max = constraint_upid;
+      } else if (IsOpGe(cs.op) || IsOpGt(cs.op)) {
+        min = IsOpGt(cs.op) ? constraint_upid + 1 : constraint_upid;
+      } else if (IsOpLe(cs.op) || IsOpLt(cs.op)) {
+        max = IsOpLt(cs.op) ? constraint_upid - 1 : constraint_upid;
+      }
+    }
+  }
+  for (const auto& ob : qc.order_by()) {
+    if (ob.iColumn == Column::kUpid) {
+      desc = ob.desc;
+      current = desc ? max : min;
+    }
+  }
+}
 
 int ProcessTable::Cursor::Column(sqlite3_context* context, int N) {
   switch (N) {
     case Column::kUpid: {
-      sqlite3_result_int64(context, upid_filter_.current);
+      sqlite3_result_int64(context, current);
       break;
     }
     case Column::kName: {
-      const auto& process = storage_->GetProcess(upid_filter_.current);
+      const auto& process = storage_->GetProcess(current);
       const auto& name = storage_->GetString(process.name_id);
       sqlite3_result_text(context, name.c_str(),
                           static_cast<int>(name.length()), nullptr);
       break;
     }
     case Column::kPid: {
-      const auto& process = storage_->GetProcess(upid_filter_.current);
+      const auto& process = storage_->GetProcess(current);
       sqlite3_result_int64(context, process.pid);
       break;
     }
@@ -90,55 +124,17 @@
   return SQLITE_OK;
 }
 
-int ProcessTable::Cursor::Filter(const QueryConstraints& qc,
-                                 sqlite3_value** argv) {
-  upid_filter_.min = 1;
-  upid_filter_.max = static_cast<uint32_t>(storage_->process_count());
-  upid_filter_.desc = false;
-  upid_filter_.current = upid_filter_.min;
-
-  for (size_t j = 0; j < qc.constraints().size(); j++) {
-    const auto& cs = qc.constraints()[j];
-    if (cs.iColumn == Column::kUpid) {
-      auto constraint_upid = static_cast<UniquePid>(sqlite3_value_int(argv[j]));
-      // Set the range of upids that we are interested in, based on the
-      // constraints in the query. Everything between min and max (inclusive)
-      // will be returned.
-      if (IsOpEq(cs.op)) {
-        upid_filter_.min = constraint_upid;
-        upid_filter_.max = constraint_upid;
-      } else if (IsOpGe(cs.op) || IsOpGt(cs.op)) {
-        upid_filter_.min =
-            IsOpGt(cs.op) ? constraint_upid + 1 : constraint_upid;
-      } else if (IsOpLe(cs.op) || IsOpLt(cs.op)) {
-        upid_filter_.max =
-            IsOpLt(cs.op) ? constraint_upid - 1 : constraint_upid;
-      }
-    }
-  }
-  for (const auto& ob : qc.order_by()) {
-    if (ob.iColumn == Column::kUpid) {
-      upid_filter_.desc = ob.desc;
-      upid_filter_.current =
-          upid_filter_.desc ? upid_filter_.max : upid_filter_.min;
-    }
-  }
-
-  return SQLITE_OK;
-}
-
 int ProcessTable::Cursor::Next() {
-  if (upid_filter_.desc) {
-    --upid_filter_.current;
+  if (desc) {
+    --current;
   } else {
-    ++upid_filter_.current;
+    ++current;
   }
   return SQLITE_OK;
 }
 
 int ProcessTable::Cursor::Eof() {
-  return upid_filter_.desc ? upid_filter_.current < upid_filter_.min
-                           : upid_filter_.current > upid_filter_.max;
+  return desc ? current < min : current > max;
 }
 
 }  // namespace trace_processor
diff --git a/src/trace_processor/process_table.h b/src/trace_processor/process_table.h
index 9ba9ce5..8c08440 100644
--- a/src/trace_processor/process_table.h
+++ b/src/trace_processor/process_table.h
@@ -38,30 +38,26 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
   class Cursor : public Table::Cursor {
    public:
-    Cursor(const TraceStorage*);
+    Cursor(const TraceStorage*, const QueryConstraints&, sqlite3_value**);
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;
 
    private:
-    struct UpidFilter {
-      UniquePid min;
-      UniquePid max;
-      UniquePid current;
-      bool desc;
-    };
-
     const TraceStorage* const storage_;
-    UpidFilter upid_filter_;
+    UniquePid min;
+    UniquePid max;
+    UniquePid current;
+    bool desc;
   };
 
   const TraceStorage* const storage_;
diff --git a/src/trace_processor/sched_slice_table.cc b/src/trace_processor/sched_slice_table.cc
index bf0acf4..8915c65 100644
--- a/src/trace_processor/sched_slice_table.cc
+++ b/src/trace_processor/sched_slice_table.cc
@@ -107,8 +107,10 @@
       {Column::kCpu, Column::kTimestamp});
 }
 
-std::unique_ptr<Table::Cursor> SchedSliceTable::CreateCursor() {
-  return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
+std::unique_ptr<Table::Cursor> SchedSliceTable::CreateCursor(
+    const QueryConstraints& qc,
+    sqlite3_value** argv) {
+  return std::unique_ptr<Table::Cursor>(new Cursor(storage_, qc, argv));
 }
 
 int SchedSliceTable::BestIndex(const QueryConstraints& qc,
@@ -126,56 +128,9 @@
   return SQLITE_OK;
 }
 
-SchedSliceTable::Cursor::Cursor(const TraceStorage* storage)
-    : storage_(storage) {}
-
-int SchedSliceTable::Cursor::Filter(const QueryConstraints& qc,
-                                    sqlite3_value** argv) {
-  filter_state_.reset(new FilterState(storage_, qc, argv));
-  return SQLITE_OK;
-}
-
-int SchedSliceTable::Cursor::Next() {
-  filter_state_->FindNextSlice();
-  return SQLITE_OK;
-}
-
-int SchedSliceTable::Cursor::Eof() {
-  return !filter_state_->IsNextRowIdIndexValid();
-}
-
-int SchedSliceTable::Cursor::Column(sqlite3_context* context, int N) {
-  PERFETTO_DCHECK(filter_state_->IsNextRowIdIndexValid());
-
-  size_t row = filter_state_->next_row_id();
-  const auto& slices = storage_->slices();
-  switch (N) {
-    case Column::kTimestamp: {
-      uint64_t ts = slices.start_ns()[row];
-      sqlite3_result_int64(context, static_cast<sqlite3_int64>(ts));
-      break;
-    }
-    case Column::kCpu: {
-      sqlite3_result_int(context, static_cast<int>(slices.cpus()[row]));
-      break;
-    }
-    case Column::kDuration: {
-      uint64_t duration = slices.durations()[row];
-      sqlite3_result_int64(context, static_cast<sqlite3_int64>(duration));
-      break;
-    }
-    case Column::kUtid: {
-      sqlite3_result_int64(context, slices.utids()[row]);
-      break;
-    }
-  }
-  return SQLITE_OK;
-}
-
-SchedSliceTable::FilterState::FilterState(
-    const TraceStorage* storage,
-    const QueryConstraints& query_constraints,
-    sqlite3_value** argv)
+SchedSliceTable::Cursor::Cursor(const TraceStorage* storage,
+                                const QueryConstraints& query_constraints,
+                                sqlite3_value** argv)
     : order_by_(query_constraints.order_by()), storage_(storage) {
   // Remove ordering on timestamp if it is the only ordering as we are already
   // sorted on TS. This makes span joining significantly faster.
@@ -220,8 +175,46 @@
   FindNextRowAndTimestamp();
 }
 
-void SchedSliceTable::FilterState::SetupSortedRowIds(uint64_t min_ts,
-                                                     uint64_t max_ts) {
+int SchedSliceTable::Cursor::Next() {
+  next_row_id_index_++;
+  FindNextRowAndTimestamp();
+  return SQLITE_OK;
+}
+
+int SchedSliceTable::Cursor::Eof() {
+  return !IsNextRowIdIndexValid();
+}
+
+int SchedSliceTable::Cursor::Column(sqlite3_context* context, int N) {
+  PERFETTO_DCHECK(IsNextRowIdIndexValid());
+
+  size_t row = next_row_id();
+  const auto& slices = storage_->slices();
+  switch (N) {
+    case Column::kTimestamp: {
+      uint64_t ts = slices.start_ns()[row];
+      sqlite3_result_int64(context, static_cast<sqlite3_int64>(ts));
+      break;
+    }
+    case Column::kCpu: {
+      sqlite3_result_int(context, static_cast<int>(slices.cpus()[row]));
+      break;
+    }
+    case Column::kDuration: {
+      uint64_t duration = slices.durations()[row];
+      sqlite3_result_int64(context, static_cast<sqlite3_int64>(duration));
+      break;
+    }
+    case Column::kUtid: {
+      sqlite3_result_int64(context, slices.utids()[row]);
+      break;
+    }
+  }
+  return SQLITE_OK;
+}
+
+void SchedSliceTable::Cursor::SetupSortedRowIds(uint64_t min_ts,
+                                                uint64_t max_ts) {
   const auto& slices = storage_->slices();
   const auto& start_ns = slices.start_ns();
   PERFETTO_CHECK(slices.slice_count() <= std::numeric_limits<uint32_t>::max());
@@ -244,7 +237,7 @@
   }
 }
 
-int SchedSliceTable::FilterState::CompareSlices(size_t f_idx, size_t s_idx) {
+int SchedSliceTable::Cursor::CompareSlices(size_t f_idx, size_t s_idx) {
   for (const auto& ob : order_by_) {
     int c = CompareSlicesOnColumn(f_idx, s_idx, ob);
     if (c != 0)
@@ -253,7 +246,7 @@
   return 0;
 }
 
-int SchedSliceTable::FilterState::CompareSlicesOnColumn(
+int SchedSliceTable::Cursor::CompareSlicesOnColumn(
     size_t f_idx,
     size_t s_idx,
     const QueryConstraints::OrderBy& ob) {
@@ -271,12 +264,7 @@
   PERFETTO_FATAL("Unexpected column %d", ob.iColumn);
 }
 
-void SchedSliceTable::FilterState::FindNextSlice() {
-  next_row_id_index_++;
-  FindNextRowAndTimestamp();
-}
-
-void SchedSliceTable::FilterState::FindNextRowAndTimestamp() {
+void SchedSliceTable::Cursor::FindNextRowAndTimestamp() {
   auto start =
       row_filter_.begin() +
       static_cast<decltype(row_filter_)::difference_type>(next_row_id_index_);
diff --git a/src/trace_processor/sched_slice_table.h b/src/trace_processor/sched_slice_table.h
index 255a3bb..ccd6b26 100644
--- a/src/trace_processor/sched_slice_table.h
+++ b/src/trace_processor/sched_slice_table.h
@@ -46,18 +46,22 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(
+      const QueryConstraints& query_constraints,
+      sqlite3_value** argv) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
-  // Transient state for a filter operation on a Cursor.
-  class FilterState {
+  // Implementation of the Table cursor interface.
+  class Cursor : public Table::Cursor {
    public:
-    FilterState(const TraceStorage* storage,
-                const QueryConstraints& query_constraints,
-                sqlite3_value** argv);
+    Cursor(const TraceStorage* storage,
+           const QueryConstraints& query_constraints,
+           sqlite3_value** argv);
 
-    void FindNextSlice();
+    int Next() override;
+    int Eof() override;
+    int Column(sqlite3_context*, int N) override;
 
     inline bool IsNextRowIdIndexValid() const {
       return next_row_id_index_ < sorted_row_ids_.size();
@@ -101,22 +105,6 @@
     const TraceStorage* const storage_;
   };
 
-  // Implementation of the SQLite cursor interface.
-  class Cursor : public Table::Cursor {
-   public:
-    Cursor(const TraceStorage* storage);
-
-    // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
-    int Next() override;
-    int Eof() override;
-    int Column(sqlite3_context*, int N) override;
-
-   private:
-    const TraceStorage* const storage_;
-    std::unique_ptr<FilterState> filter_state_;
-  };
-
   const TraceStorage* const storage_;
 };
 
diff --git a/src/trace_processor/slice_table.cc b/src/trace_processor/slice_table.cc
index ee486bd..905f73d 100644
--- a/src/trace_processor/slice_table.cc
+++ b/src/trace_processor/slice_table.cc
@@ -51,7 +51,8 @@
       {Column::kUtid, Column::kTimestamp, Column::kDepth});
 }
 
-std::unique_ptr<Table::Cursor> SliceTable::CreateCursor() {
+std::unique_ptr<Table::Cursor> SliceTable::CreateCursor(const QueryConstraints&,
+                                                        sqlite3_value**) {
   return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
 }
 
@@ -68,11 +69,6 @@
 
 SliceTable::Cursor::~Cursor() = default;
 
-int SliceTable::Cursor::Filter(const QueryConstraints&,
-                               sqlite3_value** /*argv*/) {
-  return SQLITE_OK;
-}
-
 int SliceTable::Cursor::Next() {
   row_++;
   return SQLITE_OK;
diff --git a/src/trace_processor/slice_table.h b/src/trace_processor/slice_table.h
index d575a10..7b5bbd3 100644
--- a/src/trace_processor/slice_table.h
+++ b/src/trace_processor/slice_table.h
@@ -53,7 +53,8 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
@@ -64,7 +65,6 @@
     ~Cursor() override;
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;
diff --git a/src/trace_processor/span_operator_table.cc b/src/trace_processor/span_operator_table.cc
index cceba72..6f898bc 100644
--- a/src/trace_processor/span_operator_table.cc
+++ b/src/trace_processor/span_operator_table.cc
@@ -140,9 +140,13 @@
   return Schema(columns, {Column::kTimestamp, kJoinValue});
 }
 
-std::unique_ptr<Table::Cursor> SpanOperatorTable::CreateCursor() {
-  return std::unique_ptr<SpanOperatorTable::Cursor>(
+std::unique_ptr<Table::Cursor> SpanOperatorTable::CreateCursor(
+    const QueryConstraints& qc,
+    sqlite3_value** argv) {
+  auto cursor = std::unique_ptr<SpanOperatorTable::Cursor>(
       new SpanOperatorTable::Cursor(this, db_));
+  int value = cursor->Initialize(qc, argv);
+  return value != SQLITE_OK ? nullptr : std::move(cursor);
 }
 
 int SpanOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
@@ -167,6 +171,40 @@
 SpanOperatorTable::Cursor::Cursor(SpanOperatorTable* table, sqlite3* db)
     : db_(db), table_(table) {}
 
+int SpanOperatorTable::Cursor::Initialize(const QueryConstraints& qc,
+                                          sqlite3_value** argv) {
+  sqlite3_stmt* t1_raw = nullptr;
+  int err = PrepareRawStmt(qc, argv, table_->t1_defn_, true, &t1_raw);
+  t1_.stmt.reset(t1_raw);
+  if (err != SQLITE_OK)
+    return err;
+
+  sqlite3_stmt* t2_raw = nullptr;
+  err = PrepareRawStmt(qc, argv, table_->t2_defn_, false, &t2_raw);
+  t2_.stmt.reset(t2_raw);
+  if (err != SQLITE_OK)
+    return err;
+
+  err = sqlite3_step(t1_.stmt.get());
+  if (err != SQLITE_DONE) {
+    if (err != SQLITE_ROW)
+      return SQLITE_ERROR;
+    int64_t ts = sqlite3_column_int64(t1_.stmt.get(), Column::kTimestamp);
+    t1_.latest_ts = static_cast<uint64_t>(ts);
+    t1_.col_count = static_cast<size_t>(sqlite3_column_count(t1_.stmt.get()));
+  }
+
+  err = sqlite3_step(t2_.stmt.get());
+  if (err != SQLITE_DONE) {
+    if (err != SQLITE_ROW)
+      return SQLITE_ERROR;
+    int64_t ts = sqlite3_column_int64(t2_.stmt.get(), Column::kTimestamp);
+    t2_.latest_ts = static_cast<uint64_t>(ts);
+    t2_.col_count = static_cast<size_t>(sqlite3_column_count(t2_.stmt.get()));
+  }
+  return Next();
+}
+
 SpanOperatorTable::Cursor::~Cursor() {}
 
 int SpanOperatorTable::Cursor::PrepareRawStmt(const QueryConstraints& qc,
@@ -213,67 +251,7 @@
   return sqlite3_prepare_v2(db_, sql.c_str(), t1_size, stmt, nullptr);
 }
 
-int SpanOperatorTable::Cursor::Filter(const QueryConstraints& qc,
-                                      sqlite3_value** argv) {
-  sqlite3_stmt* t1_raw = nullptr;
-  int err = PrepareRawStmt(qc, argv, table_->t1_defn_, true, &t1_raw);
-  ScopedStmt t1_stmt(t1_raw);
-  if (err != SQLITE_OK)
-    return err;
-
-  sqlite3_stmt* t2_raw = nullptr;
-  err = PrepareRawStmt(qc, argv, table_->t2_defn_, false, &t2_raw);
-  ScopedStmt t2_stmt(t2_raw);
-  if (err != SQLITE_OK)
-    return err;
-
-  filter_state_.reset(
-      new FilterState(table_, std::move(t1_stmt), std::move(t2_stmt)));
-  return filter_state_->Initialize();
-}
-
 int SpanOperatorTable::Cursor::Next() {
-  return filter_state_->Next();
-}
-
-int SpanOperatorTable::Cursor::Eof() {
-  return filter_state_->Eof();
-}
-
-int SpanOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
-  return filter_state_->Column(context, N);
-}
-
-SpanOperatorTable::FilterState::FilterState(SpanOperatorTable* table,
-                                            ScopedStmt t1_stmt,
-                                            ScopedStmt t2_stmt)
-    : table_(table) {
-  t1_.stmt = std::move(t1_stmt);
-  t2_.stmt = std::move(t2_stmt);
-}
-
-int SpanOperatorTable::FilterState::Initialize() {
-  int err = sqlite3_step(t1_.stmt.get());
-  if (err != SQLITE_DONE) {
-    if (err != SQLITE_ROW)
-      return SQLITE_ERROR;
-    int64_t ts = sqlite3_column_int64(t1_.stmt.get(), Column::kTimestamp);
-    t1_.latest_ts = static_cast<uint64_t>(ts);
-    t1_.col_count = static_cast<size_t>(sqlite3_column_count(t1_.stmt.get()));
-  }
-
-  err = sqlite3_step(t2_.stmt.get());
-  if (err != SQLITE_DONE) {
-    if (err != SQLITE_ROW)
-      return SQLITE_ERROR;
-    int64_t ts = sqlite3_column_int64(t2_.stmt.get(), Column::kTimestamp);
-    t2_.latest_ts = static_cast<uint64_t>(ts);
-    t2_.col_count = static_cast<size_t>(sqlite3_column_count(t2_.stmt.get()));
-  }
-  return Next();
-}
-
-int SpanOperatorTable::FilterState::Next() {
   PERFETTO_DCHECK(!intersecting_spans_.empty() || children_have_more_);
 
   // If there are no more rows to be added from the child tables, simply pop the
@@ -315,7 +293,7 @@
   return SQLITE_OK;
 }
 
-PERFETTO_ALWAYS_INLINE int SpanOperatorTable::FilterState::ExtractNext(
+PERFETTO_ALWAYS_INLINE int SpanOperatorTable::Cursor::ExtractNext(
     bool pull_t1) {
   // Decide which table we will be retrieving a row from.
   TableState* pull_table = pull_t1 ? &t1_ : &t2_;
@@ -382,10 +360,9 @@
   return span_added ? SQLITE_ROW : SQLITE_DONE;
 }
 
-bool SpanOperatorTable::FilterState::MaybeAddIntersectingSpan(
-    int64_t join_value,
-    Span t1_span,
-    Span t2_span) {
+bool SpanOperatorTable::Cursor::MaybeAddIntersectingSpan(int64_t join_value,
+                                                         Span t1_span,
+                                                         Span t2_span) {
   uint64_t t1_end = t1_span.ts + t1_span.dur;
   uint64_t t2_end = t2_span.ts + t2_span.dur;
 
@@ -404,11 +381,11 @@
   return true;
 }
 
-int SpanOperatorTable::FilterState::Eof() {
+int SpanOperatorTable::Cursor::Eof() {
   return intersecting_spans_.empty() && !children_have_more_;
 }
 
-int SpanOperatorTable::FilterState::Column(sqlite3_context* context, int N) {
+int SpanOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
   const auto& ret = intersecting_spans_.front();
   switch (N) {
     case Column::kTimestamp:
@@ -429,7 +406,7 @@
   return SQLITE_OK;
 }
 
-PERFETTO_ALWAYS_INLINE void SpanOperatorTable::FilterState::ReportSqliteResult(
+PERFETTO_ALWAYS_INLINE void SpanOperatorTable::Cursor::ReportSqliteResult(
     sqlite3_context* context,
     SpanOperatorTable::Value value) {
   switch (value.type) {
diff --git a/src/trace_processor/span_operator_table.h b/src/trace_processor/span_operator_table.h
index 55c8c42..9994642 100644
--- a/src/trace_processor/span_operator_table.h
+++ b/src/trace_processor/span_operator_table.h
@@ -86,7 +86,8 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints& qc, BestIndexInfo* info) override;
 
  private:
@@ -99,15 +100,16 @@
     std::string join_col_name;
   };
 
-  // State used when filtering on the span table.
-  class FilterState {
+  // Cursor on the span table.
+  class Cursor : public Table::Cursor {
    public:
-    FilterState(SpanOperatorTable*, ScopedStmt t1_stmt, ScopedStmt t2_stmt);
+    Cursor(SpanOperatorTable*, sqlite3* db);
+    ~Cursor() override;
 
-    int Initialize();
-    int Next();
-    int Eof();
-    int Column(sqlite3_context* context, int N);
+    int Initialize(const QueryConstraints& qc, sqlite3_value** argv);
+    int Next() override;
+    int Eof() override;
+    int Column(sqlite3_context* context, int N) override;
 
    private:
     // Details of a row of one of the child tables.
@@ -150,37 +152,20 @@
     void ReportSqliteResult(sqlite3_context* context,
                             SpanOperatorTable::Value value);
 
-    TableState t1_;
-    TableState t2_;
-
-    bool children_have_more_ = true;
-    std::deque<IntersectingSpan> intersecting_spans_;
-
-    SpanOperatorTable* const table_;
-  };
-
-  // Cursor on the span table.
-  class Cursor : public Table::Cursor {
-   public:
-    Cursor(SpanOperatorTable*, sqlite3* db);
-    ~Cursor() override;
-
-    // Methods to be implemented by derived table classes.
-    int Filter(const QueryConstraints& qc, sqlite3_value** argv) override;
-    int Next() override;
-    int Eof() override;
-    int Column(sqlite3_context* context, int N) override;
-
-   private:
     int PrepareRawStmt(const QueryConstraints& qc,
                        sqlite3_value** argv,
                        const TableDefinition& def,
                        bool is_t1,
                        sqlite3_stmt**);
 
+    TableState t1_;
+    TableState t2_;
+
+    bool children_have_more_ = true;
+    std::deque<IntersectingSpan> intersecting_spans_;
+
     sqlite3* const db_;
     SpanOperatorTable* const table_;
-    std::unique_ptr<FilterState> filter_state_;
   };
 
   // Converts a joined column index into an index on the columns of the child
diff --git a/src/trace_processor/string_table.cc b/src/trace_processor/string_table.cc
index adbcabe..deab74b 100644
--- a/src/trace_processor/string_table.cc
+++ b/src/trace_processor/string_table.cc
@@ -44,7 +44,9 @@
       {Column::kStringId});
 }
 
-std::unique_ptr<Table::Cursor> StringTable::CreateCursor() {
+std::unique_ptr<Table::Cursor> StringTable::CreateCursor(
+    const QueryConstraints&,
+    sqlite3_value**) {
   return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
 }
 
@@ -60,11 +62,6 @@
 
 StringTable::Cursor::~Cursor() = default;
 
-int StringTable::Cursor::Filter(const QueryConstraints&,
-                                sqlite3_value** /*argv*/) {
-  return SQLITE_OK;
-}
-
 int StringTable::Cursor::Next() {
   row_++;
   return SQLITE_OK;
diff --git a/src/trace_processor/string_table.h b/src/trace_processor/string_table.h
index e74d834..a20df0f 100644
--- a/src/trace_processor/string_table.h
+++ b/src/trace_processor/string_table.h
@@ -42,7 +42,8 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
@@ -53,7 +54,6 @@
     ~Cursor() override;
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;
diff --git a/src/trace_processor/table.cc b/src/trace_processor/table.cc
index d81b467..228551d 100644
--- a/src/trace_processor/table.cc
+++ b/src/trace_processor/table.cc
@@ -37,8 +37,8 @@
   return static_cast<Table*>(vtab);
 }
 
-Table::Cursor* ToCursor(sqlite3_vtab_cursor* cursor) {
-  return static_cast<Table::Cursor*>(cursor);
+Table::RawCursor* ToCursor(sqlite3_vtab_cursor* cursor) {
+  return static_cast<Table::RawCursor*>(cursor);
 }
 
 std::string TypeToString(Table::ColumnType type) {
@@ -119,16 +119,20 @@
 
   module->xFilter = [](sqlite3_vtab_cursor* c, int i, const char* s, int a,
                        sqlite3_value** v) {
-    return ToCursor(c)->FilterInternal(i, s, a, v);
+    return ToCursor(c)->Filter(i, s, a, v);
   };
-  module->xNext = [](sqlite3_vtab_cursor* c) { return ToCursor(c)->Next(); };
-  module->xEof = [](sqlite3_vtab_cursor* c) { return ToCursor(c)->Eof(); };
+  module->xNext = [](sqlite3_vtab_cursor* c) {
+    return ToCursor(c)->cursor()->Next();
+  };
+  module->xEof = [](sqlite3_vtab_cursor* c) {
+    return ToCursor(c)->cursor()->Eof();
+  };
   module->xColumn = [](sqlite3_vtab_cursor* c, sqlite3_context* a, int b) {
-    return ToCursor(c)->Column(a, b);
+    return ToCursor(c)->cursor()->Column(a, b);
   };
 
   module->xRowid = [](sqlite3_vtab_cursor* c, sqlite3_int64* r) {
-    return ToCursor(c)->RowId(r);
+    return ToCursor(c)->cursor()->RowId(r);
   };
 
   module->xFindFunction =
@@ -151,7 +155,7 @@
 
 int Table::OpenInternal(sqlite3_vtab_cursor** ppCursor) {
   // Freed in xClose().
-  *ppCursor = static_cast<sqlite3_vtab_cursor*>(CreateCursor().release());
+  *ppCursor = static_cast<sqlite3_vtab_cursor*>(new RawCursor(this));
   return SQLITE_OK;
 }
 
@@ -218,16 +222,12 @@
   return SQLITE_READONLY;
 }
 
-Table::Cursor::~Cursor() = default;
+Table::RawCursor::RawCursor(Table* table) : table_(table) {}
 
-int Table::Cursor::RowId(sqlite3_int64*) {
-  return SQLITE_ERROR;
-}
-
-int Table::Cursor::FilterInternal(int idxNum,
-                                  const char* idxStr,
-                                  int argc,
-                                  sqlite3_value** argv) {
+int Table::RawCursor::Filter(int idxNum,
+                             const char* idxStr,
+                             int argc,
+                             sqlite3_value** argv) {
   auto* table = ToTable(this->pVtab);
   bool cache_hit = true;
   if (idxNum != table->qc_hash_) {
@@ -241,7 +241,14 @@
   }
   PERFETTO_DCHECK(table->qc_cache_.constraints().size() ==
                   static_cast<size_t>(argc));
-  return Filter(table->qc_cache_, argv);
+  cursor_ = table_->CreateCursor(table->qc_cache_, argv);
+  return !cursor_ ? SQLITE_ERROR : SQLITE_OK;
+}
+
+Table::Cursor::~Cursor() = default;
+
+int Table::Cursor::RowId(sqlite3_int64*) {
+  return SQLITE_ERROR;
 }
 
 Table::Column::Column(size_t index,
diff --git a/src/trace_processor/table.h b/src/trace_processor/table.h
index ba84d37..6e3f8d4 100644
--- a/src/trace_processor/table.h
+++ b/src/trace_processor/table.h
@@ -80,19 +80,27 @@
     virtual ~Cursor();
 
     // Methods to be implemented by derived table classes.
-    virtual int Filter(const QueryConstraints& qc, sqlite3_value** argv) = 0;
     virtual int Next() = 0;
     virtual int Eof() = 0;
     virtual int Column(sqlite3_context* context, int N) = 0;
 
     // Optional methods to implement.
     virtual int RowId(sqlite3_int64*);
+  };
+
+  // The raw cursor class which interfaces with SQLite to manage lifecycle.
+  class RawCursor : public sqlite3_vtab_cursor {
+   public:
+    explicit RawCursor(Table* table);
+
+    int Filter(int num, const char* idxStr, int argc, sqlite3_value**);
+    Cursor* cursor() { return cursor_.get(); }
 
    private:
     friend class Table;
 
-    // Overriden functions from sqlite3_vtab_cursor.
-    int FilterInternal(int num, const char* idxStr, int argc, sqlite3_value**);
+    Table* const table_;
+    std::unique_ptr<Cursor> cursor_;
   };
 
  protected:
@@ -141,7 +149,8 @@
 
   // Methods to be implemented by derived table classes.
   virtual Schema CreateSchema(int argc, const char* const* argv) = 0;
-  virtual std::unique_ptr<Cursor> CreateCursor() = 0;
+  virtual std::unique_ptr<Cursor> CreateCursor(const QueryConstraints& qc,
+                                               sqlite3_value** argv) = 0;
   virtual int BestIndex(const QueryConstraints& qc, BestIndexInfo* info) = 0;
 
   // Optional metods to implement.
diff --git a/src/trace_processor/thread_table.cc b/src/trace_processor/thread_table.cc
index 4b6f2be..52d1137 100644
--- a/src/trace_processor/thread_table.cc
+++ b/src/trace_processor/thread_table.cc
@@ -47,8 +47,10 @@
       {Column::kUtid});
 }
 
-std::unique_ptr<Table::Cursor> ThreadTable::CreateCursor() {
-  return std::unique_ptr<Table::Cursor>(new Cursor(storage_));
+std::unique_ptr<Table::Cursor> ThreadTable::CreateCursor(
+    const QueryConstraints& qc,
+    sqlite3_value** argv) {
+  return std::unique_ptr<Table::Cursor>(new Cursor(storage_, qc, argv));
 }
 
 int ThreadTable::BestIndex(const QueryConstraints& qc, BestIndexInfo* info) {
@@ -64,13 +66,45 @@
   return SQLITE_OK;
 }
 
-ThreadTable::Cursor::Cursor(const TraceStorage* storage) : storage_(storage) {}
+ThreadTable::Cursor::Cursor(const TraceStorage* storage,
+                            const QueryConstraints& qc,
+                            sqlite3_value** argv)
+    : storage_(storage) {
+  min = 1;
+  max = static_cast<uint32_t>(storage_->thread_count());
+  desc = false;
+  current = min;
+  for (size_t j = 0; j < qc.constraints().size(); j++) {
+    const auto& cs = qc.constraints()[j];
+    if (cs.iColumn == Column::kUtid) {
+      UniqueTid constraint_utid =
+          static_cast<UniqueTid>(sqlite3_value_int(argv[j]));
+      // Filter the range of utids that we are interested in, based on the
+      // constraints in the query. Everything between min and max (inclusive)
+      // will be returned.
+      if (IsOpEq(cs.op)) {
+        min = constraint_utid;
+        max = constraint_utid;
+      } else if (IsOpGe(cs.op) || IsOpGt(cs.op)) {
+        min = IsOpGt(cs.op) ? constraint_utid + 1 : constraint_utid;
+      } else if (IsOpLe(cs.op) || IsOpLt(cs.op)) {
+        max = IsOpLt(cs.op) ? constraint_utid - 1 : constraint_utid;
+      }
+    }
+  }
+  for (const auto& ob : qc.order_by()) {
+    if (ob.iColumn == Column::kUtid) {
+      desc = ob.desc;
+      current = desc ? max : min;
+    }
+  }
+}
 
 int ThreadTable::Cursor::Column(sqlite3_context* context, int N) {
-  const auto& thread = storage_->GetThread(utid_filter_.current);
+  const auto& thread = storage_->GetThread(current);
   switch (N) {
     case Column::kUtid: {
-      sqlite3_result_int64(context, utid_filter_.current);
+      sqlite3_result_int64(context, current);
       break;
     }
     case Column::kUpid: {
@@ -95,56 +129,17 @@
   return SQLITE_OK;
 }
 
-int ThreadTable::Cursor::Filter(const QueryConstraints& qc,
-                                sqlite3_value** argv) {
-  utid_filter_.min = 1;
-  utid_filter_.max = static_cast<uint32_t>(storage_->thread_count());
-  utid_filter_.desc = false;
-  utid_filter_.current = utid_filter_.min;
-  for (size_t j = 0; j < qc.constraints().size(); j++) {
-    const auto& cs = qc.constraints()[j];
-    if (cs.iColumn == Column::kUtid) {
-      UniqueTid constraint_utid =
-          static_cast<UniqueTid>(sqlite3_value_int(argv[j]));
-      // Filter the range of utids that we are interested in, based on the
-      // constraints in the query. Everything between min and max (inclusive)
-      // will be returned.
-      if (IsOpEq(cs.op)) {
-        utid_filter_.min = constraint_utid;
-        utid_filter_.max = constraint_utid;
-      } else if (IsOpGe(cs.op) || IsOpGt(cs.op)) {
-        utid_filter_.min =
-            IsOpGt(cs.op) ? constraint_utid + 1 : constraint_utid;
-      } else if (IsOpLe(cs.op) || IsOpLt(cs.op)) {
-        utid_filter_.max =
-            IsOpLt(cs.op) ? constraint_utid - 1 : constraint_utid;
-      }
-    }
-  }
-  for (const auto& ob : qc.order_by()) {
-    if (ob.iColumn == Column::kUtid) {
-      utid_filter_.desc = ob.desc;
-      utid_filter_.current =
-          utid_filter_.desc ? utid_filter_.max : utid_filter_.min;
-    }
-  }
-
-  return SQLITE_OK;
-}
-
 int ThreadTable::Cursor::Next() {
-  if (utid_filter_.desc) {
-    --utid_filter_.current;
+  if (desc) {
+    --current;
   } else {
-    ++utid_filter_.current;
+    ++current;
   }
-
   return SQLITE_OK;
 }
 
 int ThreadTable::Cursor::Eof() {
-  return utid_filter_.desc ? utid_filter_.current < utid_filter_.min
-                           : utid_filter_.current > utid_filter_.max;
+  return desc ? current < min : current > max;
 }
 }  // namespace trace_processor
 }  // namespace perfetto
diff --git a/src/trace_processor/thread_table.h b/src/trace_processor/thread_table.h
index ff6ef74..0542863 100644
--- a/src/trace_processor/thread_table.h
+++ b/src/trace_processor/thread_table.h
@@ -38,30 +38,28 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
 
  private:
   class Cursor : public Table::Cursor {
    public:
-    Cursor(const TraceStorage*);
+    Cursor(const TraceStorage* storage,
+           const QueryConstraints&,
+           sqlite3_value**);
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;
 
    private:
-    struct UtidFilter {
-      UniqueTid min;
-      UniqueTid max;
-      UniqueTid current;
-      bool desc;
-    };
-
     const TraceStorage* const storage_;
-    UtidFilter utid_filter_;
+    UniqueTid min;
+    UniqueTid max;
+    UniqueTid current;
+    bool desc;
   };
 
   const TraceStorage* const storage_;
diff --git a/src/trace_processor/window_operator_table.cc b/src/trace_processor/window_operator_table.cc
index 0c4dbc4..396e18b 100644
--- a/src/trace_processor/window_operator_table.cc
+++ b/src/trace_processor/window_operator_table.cc
@@ -53,11 +53,13 @@
       {Column::kRowId});
 }
 
-std::unique_ptr<Table::Cursor> WindowOperatorTable::CreateCursor() {
+std::unique_ptr<Table::Cursor> WindowOperatorTable::CreateCursor(
+    const QueryConstraints& qc,
+    sqlite3_value** argv) {
   uint64_t window_end = window_start_ + window_dur_;
   uint64_t step_size = quantum_ == 0 ? window_dur_ : quantum_;
   return std::unique_ptr<Table::Cursor>(
-      new Cursor(this, window_start_, window_end, step_size));
+      new Cursor(this, window_start_, window_end, step_size, qc, argv));
 }
 
 int WindowOperatorTable::BestIndex(const QueryConstraints& qc,
@@ -89,11 +91,34 @@
 WindowOperatorTable::Cursor::Cursor(const WindowOperatorTable* table,
                                     uint64_t window_start,
                                     uint64_t window_end,
-                                    uint64_t step_size)
+                                    uint64_t step_size,
+                                    const QueryConstraints& qc,
+                                    sqlite3_value** argv)
     : window_start_(window_start),
       window_end_(window_end),
       step_size_(step_size),
-      table_(table) {}
+      table_(table) {
+  current_ts_ = window_start_;
+
+  // Set return first if there is a equals constraint on the row id asking to
+  // return the first row.
+  bool return_first = qc.constraints().size() == 1 &&
+                      qc.constraints()[0].iColumn == Column::kRowId &&
+                      IsOpEq(qc.constraints()[0].op) &&
+                      sqlite3_value_int(argv[0]) == 0;
+  // Set return CPU if there is an equals constraint on the CPU column.
+  bool return_cpu = qc.constraints().size() == 1 &&
+                    qc.constraints()[0].iColumn == Column::kCpu &&
+                    IsOpEq(qc.constraints()[0].op);
+  if (return_first) {
+    filter_type_ = FilterType::kReturnFirst;
+  } else if (return_cpu) {
+    filter_type_ = FilterType::kReturnCpu;
+    current_cpu_ = static_cast<uint32_t>(sqlite3_value_int(argv[0]));
+  } else {
+    filter_type_ = FilterType::kReturnAll;
+  }
+}
 
 int WindowOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
   switch (N) {
@@ -139,34 +164,6 @@
   return SQLITE_OK;
 }
 
-int WindowOperatorTable::Cursor::Filter(const QueryConstraints& qc,
-                                        sqlite3_value** v) {
-  current_ts_ = window_start_;
-  current_cpu_ = 0;
-  quantum_ts_ = 0;
-  row_id_ = 0;
-
-  // Set return first if there is a equals constraint on the row id asking to
-  // return the first row.
-  bool return_first = qc.constraints().size() == 1 &&
-                      qc.constraints()[0].iColumn == Column::kRowId &&
-                      IsOpEq(qc.constraints()[0].op) &&
-                      sqlite3_value_int(v[0]) == 0;
-  // Set return CPU if there is an equals constraint on the CPU column.
-  bool return_cpu = qc.constraints().size() == 1 &&
-                    qc.constraints()[0].iColumn == Column::kCpu &&
-                    IsOpEq(qc.constraints()[0].op);
-  if (return_first) {
-    filter_type_ = FilterType::kReturnFirst;
-  } else if (return_cpu) {
-    filter_type_ = FilterType::kReturnCpu;
-    current_cpu_ = static_cast<uint32_t>(sqlite3_value_int(v[0]));
-  } else {
-    filter_type_ = FilterType::kReturnAll;
-  }
-  return SQLITE_OK;
-}
-
 int WindowOperatorTable::Cursor::Next() {
   switch (filter_type_) {
     case FilterType::kReturnFirst:
diff --git a/src/trace_processor/window_operator_table.h b/src/trace_processor/window_operator_table.h
index 37b8f30..dd2df57 100644
--- a/src/trace_processor/window_operator_table.h
+++ b/src/trace_processor/window_operator_table.h
@@ -46,7 +46,8 @@
 
   // Table implementation.
   Table::Schema CreateSchema(int argc, const char* const* argv) override;
-  std::unique_ptr<Table::Cursor> CreateCursor() override;
+  std::unique_ptr<Table::Cursor> CreateCursor(const QueryConstraints&,
+                                              sqlite3_value**) override;
   int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
   int Update(int, sqlite3_value**, sqlite3_int64*) override;
 
@@ -56,10 +57,11 @@
     Cursor(const WindowOperatorTable*,
            uint64_t window_start,
            uint64_t window_end,
-           uint64_t step_size);
+           uint64_t step_size,
+           const QueryConstraints& qc,
+           sqlite3_value** argv);
 
     // Implementation of Table::Cursor.
-    int Filter(const QueryConstraints&, sqlite3_value**) override;
     int Next() override;
     int Eof() override;
     int Column(sqlite3_context*, int N) override;