Merge changes I8aec6ee1,I25f017c0,I4e49a5f0,Ie794521a,I22340a55, ...
* changes:
perfetto-ui: Update CPU track to use span join
trace_processor: optimize performance of sched, span and window tables
trace_processor: add special handling for CPU in window operator
trace_processor: generify and cleanup span join
trace_processor: add window virtual table
trace_processor: add the SPAN JOIN virtual table
Reland "trace_processor: collapse sched slice table to a flat table"
diff --git a/src/ipc/unix_socket_unittest.cc b/src/ipc/unix_socket_unittest.cc
index b571c1c..024072f 100644
--- a/src/ipc/unix_socket_unittest.cc
+++ b/src/ipc/unix_socket_unittest.cc
@@ -244,8 +244,8 @@
int buf_fd[2] = {null_fd.get(), zero_fd.get()};
- ASSERT_TRUE(cli->Send(cli_str, sizeof(cli_str), buf_fd,
- base::ArraySize(buf_fd)));
+ ASSERT_TRUE(
+ cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
base::ArraySize(buf_fd)));
task_runner_.RunUntilCheckpoint("srv_did_recv");
diff --git a/src/profiling/memory/record_reader.cc b/src/profiling/memory/record_reader.cc
index 704154d..931b41a 100644
--- a/src/profiling/memory/record_reader.cc
+++ b/src/profiling/memory/record_reader.cc
@@ -28,7 +28,7 @@
constexpr size_t kMaxRecordSize = 8 * 1024 * 1024; // 8 MiB
static_assert(kMaxRecordSize <= std::numeric_limits<size_t>::max(),
"kMaxRecordSize must fit into size_t");
-}
+} // namespace
RecordReader::ReceiveBuffer RecordReader::BeginReceive() {
if (read_idx_ < sizeof(record_size_buf_))
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 092c6bf..978f128 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -65,6 +65,8 @@
"slice_table.h",
"slice_tracker.cc",
"slice_tracker.h",
+ "span_operator_table.cc",
+ "span_operator_table.h",
"sqlite_utils.h",
"string_table.cc",
"string_table.h",
@@ -82,6 +84,8 @@
"trace_storage.cc",
"trace_storage.h",
"virtual_destructors.cc",
+ "window_operator_table.cc",
+ "window_operator_table.h",
]
deps = [
"../../buildtools:sqlite",
@@ -142,6 +146,7 @@
"sched_slice_table_unittest.cc",
"sched_tracker_unittest.cc",
"slice_tracker_unittest.cc",
+ "span_operator_table_unittest.cc",
"thread_table_unittest.cc",
"trace_sorter_unittest.cc",
]
diff --git a/src/trace_processor/sched_slice_table.cc b/src/trace_processor/sched_slice_table.cc
index dfdde24..ab01a6d 100644
--- a/src/trace_processor/sched_slice_table.cc
+++ b/src/trace_processor/sched_slice_table.cc
@@ -101,11 +101,7 @@
"ts UNSIGNED BIG INT, "
"cpu UNSIGNED INT, "
"dur UNSIGNED BIG INT, "
- "quantized_group UNSIGNED BIG INT, "
"utid UNSIGNED INT, "
- "quantum HIDDEN BIG INT, "
- "ts_lower_bound HIDDEN BIG INT, "
- "ts_clip HIDDEN BOOLEAN, "
"PRIMARY KEY(cpu, ts)"
") WITHOUT ROWID;";
}
@@ -119,82 +115,16 @@
bool is_time_constrained = false;
for (size_t i = 0; i < qc.constraints().size(); i++) {
const auto& cs = qc.constraints()[i];
-
- // Omit SQLite constraint checks on the hidden columns, so the client can
- // write queries of the form "quantum=x" "ts_lower_bound=x" "ts_clip=true".
- // Disallow any other constraint on these columns.
- if (cs.iColumn == Column::kTimestampLowerBound ||
- cs.iColumn == Column::kQuantizedGroup ||
- cs.iColumn == Column::kClipTimestamp) {
- if (!IsOpEq(cs.op))
- return SQLITE_CONSTRAINT_FUNCTION;
- info->omit[i] = true;
- }
-
- if (cs.iColumn == Column::kTimestampLowerBound ||
- cs.iColumn == Column::kTimestamp) {
+ if (cs.iColumn == Column::kTimestamp)
is_time_constrained = true;
- }
}
info->estimated_cost = is_time_constrained ? 10 : 10000;
-
- bool is_quantized_group_order_desc = false;
- bool is_duration_timestamp_order = false;
- for (const auto& ob : qc.order_by()) {
- switch (ob.iColumn) {
- case Column::kQuantizedGroup:
- if (ob.desc)
- is_quantized_group_order_desc = true;
- break;
- case Column::kTimestamp:
- case Column::kDuration:
- is_duration_timestamp_order = true;
- break;
- case Column::kCpu:
- break;
-
- // Can't order on hidden columns.
- case Column::kQuantum:
- case Column::kTimestampLowerBound:
- case Column::kClipTimestamp:
- return SQLITE_CONSTRAINT_FUNCTION;
- }
- }
-
- bool has_quantum_constraint = false;
- for (const auto& cs : qc.constraints()) {
- if (cs.iColumn == Column::kQuantum)
- has_quantum_constraint = true;
- }
-
- // If a quantum constraint is present, we don't support native ordering by
- // time related parameters or by quantized group in descending order.
- bool needs_sqlite_orderby =
- has_quantum_constraint &&
- (is_duration_timestamp_order || is_quantized_group_order_desc);
-
- info->order_by_consumed = !needs_sqlite_orderby;
+ info->order_by_consumed = true;
return SQLITE_OK;
}
-int SchedSliceTable::FindFunction(const char* name,
- FindFunctionFn fn,
- void** args) {
- // Add an identity match function to prevent throwing an exception when
- // matching on the quantum column.
- if (strcmp(name, "match") == 0) {
- *fn = [](sqlite3_context* ctx, int n, sqlite3_value** v) {
- PERFETTO_DCHECK(n == 2 && sqlite3_value_type(v[0]) == SQLITE_INTEGER);
- sqlite3_result_int64(ctx, sqlite3_value_int64(v[0]));
- };
- *args = nullptr;
- return 1;
- }
- return 0;
-}
-
SchedSliceTable::Cursor::Cursor(const TraceStorage* storage)
: storage_(storage) {}
@@ -205,71 +135,34 @@
}
int SchedSliceTable::Cursor::Next() {
- auto* state = filter_state_->StateForCpu(filter_state_->next_cpu());
- state->FindNextSlice();
- filter_state_->FindCpuWithNextSlice();
+ filter_state_->FindNextSlice();
return SQLITE_OK;
}
int SchedSliceTable::Cursor::Eof() {
- return !filter_state_->IsNextCpuValid();
+ return !filter_state_->IsNextRowIdIndexValid();
}
int SchedSliceTable::Cursor::Column(sqlite3_context* context, int N) {
- if (!filter_state_->IsNextCpuValid())
- return SQLITE_ERROR;
+ PERFETTO_DCHECK(filter_state_->IsNextRowIdIndexValid());
- uint64_t quantum = filter_state_->quantum();
- uint32_t cpu = filter_state_->next_cpu();
- const auto* state = filter_state_->StateForCpu(cpu);
- size_t row = state->next_row_id();
- const auto& slices = storage_->SlicesForCpu(cpu);
+ size_t row = filter_state_->next_row_id();
+ const auto& slices = storage_->slices();
switch (N) {
case Column::kTimestamp: {
- uint64_t timestamp = state->next_timestamp();
- timestamp = std::max(timestamp, state->ts_clip_min());
- sqlite3_result_int64(context, static_cast<sqlite3_int64>(timestamp));
+ uint64_t ts = slices.start_ns()[row];
+ sqlite3_result_int64(context, static_cast<sqlite3_int64>(ts));
break;
}
case Column::kCpu: {
- sqlite3_result_int(context, static_cast<int>(cpu));
+ sqlite3_result_int(context, static_cast<int>(slices.cpus()[row]));
break;
}
case Column::kDuration: {
- uint64_t duration;
- if (quantum == 0) {
- duration = slices.durations()[row];
- uint64_t start_ns = state->next_timestamp();
- uint64_t end_ns = start_ns + duration;
- uint64_t clip_trim_ns = 0;
- if (state->ts_clip_min() > start_ns)
- clip_trim_ns += state->ts_clip_min() - start_ns;
- if (end_ns > state->ts_clip_max())
- clip_trim_ns += end_ns - state->ts_clip_min();
- duration -= std::min(clip_trim_ns, duration);
- } else {
- uint64_t start_quantised_group = state->next_timestamp() / quantum;
- uint64_t end = slices.start_ns()[row] + slices.durations()[row];
- uint64_t next_group_start = (start_quantised_group + 1) * quantum;
-
- // Compute the minimum of the start of the next group boundary and the
- // end of this slice.
- uint64_t min_slice_end = std::min<uint64_t>(end, next_group_start);
- duration = min_slice_end - state->next_timestamp();
- }
+ uint64_t duration = slices.durations()[row];
sqlite3_result_int64(context, static_cast<sqlite3_int64>(duration));
break;
}
- case Column::kQuantizedGroup: {
- auto group = quantum == 0 ? state->next_timestamp()
- : state->next_timestamp() / quantum;
- sqlite3_result_int64(context, static_cast<sqlite3_int64>(group));
- break;
- }
- case Column::kQuantum: {
- sqlite3_result_int64(context, static_cast<sqlite3_int64>(quantum));
- break;
- }
case Column::kUtid: {
sqlite3_result_int64(context, slices.utids()[row]);
break;
@@ -283,13 +176,18 @@
const QueryConstraints& query_constraints,
sqlite3_value** argv)
: order_by_(query_constraints.order_by()), storage_(storage) {
+ // Remove ordering on timestamp if it is the only ordering as we are already
+ // sorted on TS. This makes span joining significantly faster.
+ if (order_by_.size() == 1 && order_by_[0].iColumn == Column::kTimestamp &&
+ !order_by_[0].desc) {
+ order_by_.clear();
+ }
+
std::bitset<base::kMaxCpus> cpu_filter;
cpu_filter.set();
uint64_t min_ts = 0;
uint64_t max_ts = kUint64Max;
- uint64_t ts_lower_bound = 0;
- bool ts_clip = false;
for (size_t i = 0; i < query_constraints.constraints().size(); i++) {
const auto& cs = query_constraints.constraints()[i];
@@ -297,15 +195,6 @@
case Column::kCpu:
PopulateFilterBitmap(cs.op, argv[i], &cpu_filter);
break;
- case Column::kQuantum:
- quantum_ = static_cast<uint64_t>(sqlite3_value_int64(argv[i]));
- break;
- case Column::kTimestampLowerBound:
- ts_lower_bound = static_cast<uint64_t>(sqlite3_value_int64(argv[i]));
- break;
- case Column::kClipTimestamp:
- ts_clip = sqlite3_value_int(argv[i]) ? true : false;
- break;
case Column::kTimestamp: {
auto ts = static_cast<uint64_t>(sqlite3_value_int64(argv[i]));
if (IsOpGe(cs.op) || IsOpGt(cs.op)) {
@@ -317,92 +206,22 @@
}
}
}
+ SetupSortedRowIds(min_ts, max_ts);
- if (ts_clip) {
- PERFETTO_DCHECK(ts_lower_bound == 0);
- if (ts_lower_bound)
- PERFETTO_ELOG("Cannot use ts_lower_bound and ts_clip together");
- ts_lower_bound = min_ts;
- min_ts = 0;
- }
-
- // If the query specifies a lower bound on ts, find that bound across all
- // CPUs involved in the query and turn that into a min_ts constraint.
- // ts_lower_bound is defined as the largest timestamp < X, or if none, the
- // smallest timestamp >= X.
- if (ts_lower_bound > 0) {
- uint64_t largest_ts_before = 0;
- uint64_t smallest_ts_after = kUint64Max;
- for (uint32_t cpu = 0; cpu < base::kMaxCpus; cpu++) {
- if (!cpu_filter.test(cpu))
- continue;
- const auto& start_ns = storage_->SlicesForCpu(cpu).start_ns();
- // std::lower_bound will find the first timestamp >= |ts_lower_bound|.
- // From there we need to move one back, if possible.
- auto it =
- std::lower_bound(start_ns.begin(), start_ns.end(), ts_lower_bound);
- if (std::distance(start_ns.begin(), it) > 0)
- it--;
- if (it == start_ns.end())
- continue;
- if (*it < ts_lower_bound) {
- largest_ts_before = std::max(largest_ts_before, *it);
- } else {
- smallest_ts_after = std::min(smallest_ts_after, *it);
- }
+ // Filter rows on CPUs if any CPUs need to be excluded.
+ const auto& slices = storage_->slices();
+ row_filter_.resize(sorted_row_ids_.size(), true);
+ if (cpu_filter.count() < cpu_filter.size()) {
+ for (size_t i = 0; i < sorted_row_ids_.size(); i++) {
+ row_filter_[i] = cpu_filter.test(slices.cpus()[sorted_row_ids_[i]]);
}
- uint64_t lower_bound = std::min(largest_ts_before, smallest_ts_after);
- min_ts = std::max(min_ts, lower_bound);
- } // if (ts_lower_bound)
-
- // Setup CPU filtering because the trace storage is indexed by CPU.
- for (uint32_t cpu = 0; cpu < base::kMaxCpus; cpu++) {
- if (!cpu_filter.test(cpu))
- continue;
- uint64_t ts_clip_min = ts_clip ? min_ts : 0;
- uint64_t ts_clip_max = ts_clip ? max_ts : kUint64Max;
- StateForCpu(cpu)->Initialize(
- cpu, storage_, quantum_, ts_clip_min, ts_clip_max,
- CreateSortedIndexVectorForCpu(cpu, min_ts, max_ts));
}
-
- // Set the cpu index to be the first item to look at.
- FindCpuWithNextSlice();
+ FindNextRowAndTimestamp();
}
-void SchedSliceTable::FilterState::FindCpuWithNextSlice() {
- next_cpu_ = base::kMaxCpus;
-
- for (uint32_t cpu = 0; cpu < base::kMaxCpus; cpu++) {
- const auto& cpu_state = per_cpu_state_[cpu];
- if (!cpu_state.IsNextRowIdIndexValid())
- continue;
-
- // The first CPU with a valid slice can be set to the next CPU.
- if (next_cpu_ == base::kMaxCpus) {
- next_cpu_ = cpu;
- continue;
- }
-
- // If the current CPU is ordered before the current "next" CPU, then update
- // the cpu value.
- int cmp = CompareCpuToNextCpu(cpu);
- if (cmp < 0)
- next_cpu_ = cpu;
- }
-}
-
-int SchedSliceTable::FilterState::CompareCpuToNextCpu(uint32_t cpu) {
- size_t next_row = per_cpu_state_[next_cpu_].next_row_id();
- size_t row = per_cpu_state_[cpu].next_row_id();
- return CompareSlices(cpu, row, next_cpu_, next_row);
-}
-
-std::vector<uint32_t>
-SchedSliceTable::FilterState::CreateSortedIndexVectorForCpu(uint32_t cpu,
- uint64_t min_ts,
- uint64_t max_ts) {
- const auto& slices = storage_->SlicesForCpu(cpu);
+void SchedSliceTable::FilterState::SetupSortedRowIds(uint64_t min_ts,
+ uint64_t max_ts) {
+ const auto& slices = storage_->slices();
const auto& start_ns = slices.start_ns();
PERFETTO_CHECK(slices.slice_count() <= std::numeric_limits<uint32_t>::max());
@@ -411,26 +230,22 @@
ptrdiff_t dist = std::distance(min_it, max_it);
PERFETTO_CHECK(dist >= 0 && static_cast<size_t>(dist) <= start_ns.size());
- std::vector<uint32_t> indices(static_cast<size_t>(dist));
-
// Fill |indices| with the consecutive row numbers affected by the filtering.
- std::iota(indices.begin(), indices.end(),
+ sorted_row_ids_.resize(static_cast<size_t>(dist));
+ std::iota(sorted_row_ids_.begin(), sorted_row_ids_.end(),
std::distance(start_ns.begin(), min_it));
- // In other cases, sort by the given criteria.
- std::sort(indices.begin(), indices.end(),
- [this, cpu](uint32_t f, uint32_t s) {
- return CompareSlices(cpu, f, cpu, s) < 0;
- });
- return indices;
+ // Sort if there is any order by constraints.
+ if (!order_by_.empty()) {
+ std::sort(
+ sorted_row_ids_.begin(), sorted_row_ids_.end(),
+ [this](uint32_t f, uint32_t s) { return CompareSlices(f, s) < 0; });
+ }
}
-int SchedSliceTable::FilterState::CompareSlices(uint32_t f_cpu,
- size_t f_idx,
- uint32_t s_cpu,
- size_t s_idx) {
+int SchedSliceTable::FilterState::CompareSlices(size_t f_idx, size_t s_idx) {
for (const auto& ob : order_by_) {
- int c = CompareSlicesOnColumn(f_cpu, f_idx, s_cpu, s_idx, ob);
+ int c = CompareSlicesOnColumn(f_idx, s_idx, ob);
if (c != 0)
return c;
}
@@ -438,83 +253,35 @@
}
int SchedSliceTable::FilterState::CompareSlicesOnColumn(
- uint32_t f_cpu,
size_t f_idx,
- uint32_t s_cpu,
size_t s_idx,
const QueryConstraints::OrderBy& ob) {
- const auto& f_sl = storage_->SlicesForCpu(f_cpu);
- const auto& s_sl = storage_->SlicesForCpu(s_cpu);
+ const auto& sl = storage_->slices();
switch (ob.iColumn) {
- case SchedSliceTable::Column::kQuantum:
- case SchedSliceTable::Column::kTimestampLowerBound:
- PERFETTO_CHECK(false);
case SchedSliceTable::Column::kTimestamp:
- return Compare(f_sl.start_ns()[f_idx], s_sl.start_ns()[s_idx], ob.desc);
+ return Compare(sl.start_ns()[f_idx], sl.start_ns()[s_idx], ob.desc);
case SchedSliceTable::Column::kDuration:
- return Compare(f_sl.durations()[f_idx], s_sl.durations()[s_idx], ob.desc);
+ return Compare(sl.durations()[f_idx], sl.durations()[s_idx], ob.desc);
case SchedSliceTable::Column::kCpu:
- return Compare(f_cpu, s_cpu, ob.desc);
+ return Compare(sl.cpus()[f_idx], sl.cpus()[s_idx], ob.desc);
case SchedSliceTable::Column::kUtid:
- return Compare(f_sl.utids()[f_idx], s_sl.utids()[s_idx], ob.desc);
- case SchedSliceTable::Column::kQuantizedGroup: {
- // We don't support sorting in descending order on quantized group when
- // we have a non-zero quantum.
- PERFETTO_CHECK(!ob.desc || quantum_ == 0);
-
- uint64_t f_timestamp = StateForCpu(f_cpu)->next_timestamp();
- uint64_t s_timestamp = StateForCpu(s_cpu)->next_timestamp();
-
- uint64_t f_group = quantum_ == 0 ? f_timestamp : f_timestamp / quantum_;
- uint64_t s_group = quantum_ == 0 ? s_timestamp : s_timestamp / quantum_;
- return Compare(f_group, s_group, ob.desc);
- }
+ return Compare(sl.utids()[f_idx], sl.utids()[s_idx], ob.desc);
}
PERFETTO_FATAL("Unexpected column %d", ob.iColumn);
}
-void SchedSliceTable::PerCpuState::Initialize(
- uint32_t cpu,
- const TraceStorage* storage,
- uint64_t quantum,
- uint64_t ts_clip_min,
- uint64_t ts_clip_max,
- std::vector<uint32_t> sorted_row_ids) {
- cpu_ = cpu;
- storage_ = storage;
- quantum_ = quantum;
- ts_clip_min_ = ts_clip_min;
- ts_clip_max_ = ts_clip_max;
- sorted_row_ids_ = std::move(sorted_row_ids);
- UpdateNextTimestampForNextRow();
+void SchedSliceTable::FilterState::FindNextSlice() {
+ next_row_id_index_++;
+ FindNextRowAndTimestamp();
}
-void SchedSliceTable::PerCpuState::FindNextSlice() {
- PERFETTO_DCHECK(next_timestamp_ != 0);
-
- const auto& slices = Slices();
- if (quantum_ == 0) {
- next_row_id_index_++;
- UpdateNextTimestampForNextRow();
- return;
- }
-
- uint64_t start_group = next_timestamp_ / quantum_;
- uint64_t end_slice =
- slices.start_ns()[next_row_id()] + slices.durations()[next_row_id()];
- uint64_t next_group_start = (start_group + 1) * quantum_;
-
- if (next_group_start >= end_slice) {
- next_row_id_index_++;
- UpdateNextTimestampForNextRow();
- } else {
- next_timestamp_ = next_group_start;
- }
-}
-
-void SchedSliceTable::PerCpuState::UpdateNextTimestampForNextRow() {
- next_timestamp_ =
- IsNextRowIdIndexValid() ? Slices().start_ns()[next_row_id()] : 0;
+void SchedSliceTable::FilterState::FindNextRowAndTimestamp() {
+ auto start =
+ row_filter_.begin() +
+ static_cast<decltype(row_filter_)::difference_type>(next_row_id_index_);
+ auto next_it = std::find(start, row_filter_.end(), true);
+ next_row_id_index_ =
+ static_cast<uint32_t>(std::distance(row_filter_.begin(), next_it));
}
} // namespace trace_processor
diff --git a/src/trace_processor/sched_slice_table.h b/src/trace_processor/sched_slice_table.h
index 4e86336..582dded 100644
--- a/src/trace_processor/sched_slice_table.h
+++ b/src/trace_processor/sched_slice_table.h
@@ -37,13 +37,7 @@
kTimestamp = 0,
kCpu = 1,
kDuration = 2,
- kQuantizedGroup = 3,
- kUtid = 4,
-
- // Hidden columns.
- kQuantum = 5,
- kTimestampLowerBound = 6,
- kClipTimestamp = 7,
+ kUtid = 3,
};
SchedSliceTable(sqlite3*, const TraceStorage* storage);
@@ -54,60 +48,8 @@
std::string CreateTableStmt(int argc, const char* const* argv) override;
std::unique_ptr<Table::Cursor> CreateCursor() override;
int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
- int FindFunction(const char* name, FindFunctionFn fn, void** args) override;
private:
- // Transient filter state for each CPU of this trace.
- class PerCpuState {
- public:
- void Initialize(uint32_t cpu,
- const TraceStorage* storage,
- uint64_t quantum,
- uint64_t ts_clip_min,
- uint64_t ts_clip_max,
- std::vector<uint32_t> sorted_row_ids);
- void FindNextSlice();
- bool IsNextRowIdIndexValid() const {
- return next_row_id_index_ < sorted_row_ids_.size();
- }
-
- size_t next_row_id() const { return sorted_row_ids_[next_row_id_index_]; }
- uint64_t next_timestamp() const { return next_timestamp_; }
- uint64_t ts_clip_min() const { return ts_clip_min_; }
- uint64_t ts_clip_max() const { return ts_clip_max_; }
-
- private:
- const TraceStorage::SlicesPerCpu& Slices() {
- return storage_->SlicesForCpu(cpu_);
- }
-
- void UpdateNextTimestampForNextRow();
-
- // Vector of row ids sorted by the the given order by constraints.
- std::vector<uint32_t> sorted_row_ids_;
-
- // An offset into |sorted_row_ids_| indicating the next row to return.
- uint32_t next_row_id_index_ = 0;
-
- // The timestamp of the row to index. This is either the timestamp of
- // the slice at |next_row_id_index_| or the timestamp of the next quantized
- // group boundary.
- uint64_t next_timestamp_ = 0;
-
- // The CPU this state is associated with.
- uint32_t cpu_ = 0;
-
- // The quantum the output slices should fall within.
- uint64_t quantum_ = 0;
-
- // When clipping is applied (i.e. WHERE ts_clip between X and Y), slices are
- // cut and shrunk around the min-max boundaries to fit in the clip window.
- uint64_t ts_clip_min_ = 0;
- uint64_t ts_clip_max_ = std::numeric_limits<uint64_t>::max();
-
- const TraceStorage* storage_ = nullptr;
- };
-
// Transient state for a filter operation on a Cursor.
class FilterState {
public:
@@ -115,57 +57,43 @@
const QueryConstraints& query_constraints,
sqlite3_value** argv);
- // Chooses the next CPU which should be returned according to the sorting
- // criteria specified by |order_by_|.
- void FindCpuWithNextSlice();
+ void FindNextSlice();
- // Returns whether the next CPU to be returned by this filter operation is
- // valid.
- bool IsNextCpuValid() const { return next_cpu_ < per_cpu_state_.size(); }
+ inline bool IsNextRowIdIndexValid() const {
+ return next_row_id_index_ < sorted_row_ids_.size();
+ }
- // Returns the transient state associated with a single CPU.
- PerCpuState* StateForCpu(uint32_t cpu) { return &per_cpu_state_[cpu]; }
-
- uint32_t next_cpu() const { return next_cpu_; }
- uint64_t quantum() const { return quantum_; }
+ size_t next_row_id() const { return sorted_row_ids_[next_row_id_index_]; }
private:
- // Creates a vector of indices into the slices for the given |cpu| sorted
- // by the order by criteria.
- std::vector<uint32_t> CreateSortedIndexVectorForCpu(uint32_t cpu,
- uint64_t min_ts,
- uint64_t max_ts);
+ // Updates |sorted_row_ids_| with the indices into the slices sorted by the
+ // order by criteria.
+ void SetupSortedRowIds(uint64_t min_ts, uint64_t max_ts);
- // Compares the next slice of the given |cpu| with the next slice of the
- // |next_cpu_|. Return <0 if |cpu| is ordered before, >0 if ordered after,
- // and 0 if they are equal.
- int CompareCpuToNextCpu(uint32_t cpu);
-
- // Compares the slice at index |f| in |f_slices| for CPU |f_cpu| with the
- // slice at index |s| in |s_slices| for CPU |s_cpu| on all columns.
+ // Compares the slice at index |f| with the slice at index |s|on all
+ // columns.
// Returns -1 if the first slice is before the second in the ordering, 1 if
// the first slice is after the second and 0 if they are equal.
- int CompareSlices(uint32_t f_cpu, size_t f, uint32_t s_cpu, size_t s);
+ int CompareSlices(size_t f, size_t s);
- // Compares the slice at index |f| in |f_slices| for CPU |f_cpu| with the
- // slice at index |s| in |s_slices| for CPU |s_cpu| on the criteria in
- // |order_by|.
+ // Compares the slice at index |f| with the slice at index |s| on the
+ // criteria in |order_by|.
// Returns -1 if the first slice is before the second in the ordering, 1 if
// the first slice is after the second and 0 if they are equal.
- int CompareSlicesOnColumn(uint32_t f_cpu,
- size_t f,
- uint32_t s_cpu,
+ int CompareSlicesOnColumn(size_t f,
size_t s,
const QueryConstraints::OrderBy& order_by);
- // One entry for each cpu which is used in filtering.
- std::array<PerCpuState, base::kMaxCpus> per_cpu_state_;
+ void FindNextRowAndTimestamp();
- // The next CPU which should be returned to the user.
- uint32_t next_cpu_ = 0;
+ // Vector of row ids sorted by the the given order by constraints.
+ std::vector<uint32_t> sorted_row_ids_;
- // The quantum the output slices should fall within.
- uint64_t quantum_ = 0;
+ // Bitset for filtering slices.
+ std::vector<bool> row_filter_;
+
+ // An offset into |sorted_row_ids_| indicating the next row to return.
+ uint32_t next_row_id_index_ = 0;
// The sorting criteria for this filter operation.
std::vector<QueryConstraints::OrderBy> order_by_;
diff --git a/src/trace_processor/sched_slice_table_unittest.cc b/src/trace_processor/sched_slice_table_unittest.cc
index 31c215f..1f3cc18 100644
--- a/src/trace_processor/sched_slice_table_unittest.cc
+++ b/src/trace_processor/sched_slice_table_unittest.cc
@@ -169,124 +169,6 @@
ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
}
-TEST_F(SchedSliceTableTest, QuanitsiationCpuNativeOrder) {
- uint32_t cpu_1 = 3;
- uint32_t cpu_2 = 8;
- uint64_t timestamp = 100;
- uint32_t pid_1 = 2;
- uint32_t prev_state = 32;
- static const char kCommProc1[] = "process1";
- static const char kCommProc2[] = "process2";
- uint32_t pid_2 = 4;
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp, pid_1, prev_state,
- kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 3, pid_2,
- prev_state, kCommProc2, pid_1);
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 4, pid_1,
- prev_state, kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 10, pid_2,
- prev_state, kCommProc2, pid_1);
-
- PrepareValidStatement(
- "SELECT dur, ts, cpu FROM sched WHERE quantum = 5 ORDER BY cpu");
-
- // Event at ts + 3 sliced off at quantum boundary (105).
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 2 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp + 3);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_1);
-
- // Remainder of event at ts + 3 after quantum boundary (105 onwards).
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 5 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp + 5);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_1);
-
- // Full event at ts.
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 4 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_2);
-
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
-}
-
-TEST_F(SchedSliceTableTest, QuantizationSqliteDurationOrder) {
- uint32_t cpu_1 = 3;
- uint32_t cpu_2 = 8;
- uint64_t timestamp = 100;
- uint32_t pid_1 = 2;
- uint32_t prev_state = 32;
- static const char kCommProc1[] = "process1";
- static const char kCommProc2[] = "process2";
- uint32_t pid_2 = 4;
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp, pid_1, prev_state,
- kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 3, pid_2,
- prev_state, kCommProc2, pid_1);
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 4, pid_1,
- prev_state, kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 10, pid_2,
- prev_state, kCommProc2, pid_1);
-
- PrepareValidStatement(
- "SELECT dur, ts, cpu FROM sched WHERE quantum = 5 ORDER BY dur");
-
- // Event at ts + 3 sliced off at quantum boundary (105).
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 2 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp + 3);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_2);
-
- // Full event at ts.
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 4 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_1);
-
- // Remainder of event at ts + 3 after quantum boundary (105 onwards).
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 5 /* duration */);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 1), timestamp + 5);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 2), cpu_2);
-
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
-}
-
-TEST_F(SchedSliceTableTest, QuantizationGroupAndSum) {
- uint32_t cpu_1 = 3;
- uint32_t cpu_2 = 8;
- uint64_t timestamp = 100;
- uint32_t pid_1 = 2;
- uint32_t prev_state = 32;
- static const char kCommProc1[] = "process1";
- static const char kCommProc2[] = "process2";
- uint32_t pid_2 = 4;
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp, pid_1, prev_state,
- kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 3, pid_2,
- prev_state, kCommProc2, pid_1);
- context_.sched_tracker->PushSchedSwitch(cpu_1, timestamp + 4, pid_1,
- prev_state, kCommProc1, pid_2);
- context_.sched_tracker->PushSchedSwitch(cpu_2, timestamp + 10, pid_2,
- prev_state, kCommProc2, pid_1);
-
- PrepareValidStatement(
- "SELECT SUM(dur) as sum_dur "
- "FROM sched "
- "WHERE quantum = 5 "
- "GROUP BY quantized_group "
- "ORDER BY sum_dur");
-
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 5 /* SUM(duration) */);
-
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_ROW);
- ASSERT_EQ(sqlite3_column_int64(*stmt_, 0), 6 /* SUM(duration) */);
-
- ASSERT_EQ(sqlite3_step(*stmt_), SQLITE_DONE);
-}
-
TEST_F(SchedSliceTableTest, UtidTest) {
uint32_t cpu = 3;
uint64_t timestamp = 100;
@@ -349,21 +231,6 @@
ASSERT_THAT(query("ts >= 55 and ts < 52"), IsEmpty());
ASSERT_THAT(query("ts >= 70 and ts < 71"), ElementsAre(70));
ASSERT_THAT(query("ts >= 59 and ts < 73"), ElementsAre(59, 60, 70, 71, 72));
-
- // Test the special ts_lower_bound column.
- ASSERT_THAT(query("ts_lower_bound = 1 and ts < 10"), IsEmpty());
- ASSERT_THAT(query("ts_lower_bound = 50 and ts <= 50"), ElementsAre(50));
- ASSERT_THAT(query("ts_lower_bound = 100"), ElementsAre(80));
- ASSERT_THAT(query("ts_lower_bound = 100 and cpu = 5"), ElementsAre(60));
- ASSERT_THAT(query("ts_lower_bound = 100 and cpu = 7"), ElementsAre(80));
- ASSERT_THAT(query("ts_lower_bound = 1 and ts <= 52"),
- ElementsAre(50, 51, 52));
- ASSERT_THAT(query("ts_lower_bound = 70 and ts <= 71"),
- ElementsAre(60, 70, 71));
- ASSERT_THAT(query("ts_lower_bound = 60 and ts > 58 and ts <= 71"),
- ElementsAre(59, 60, 70, 71));
- ASSERT_THAT(query("ts_lower_bound = 70 and ts > 70 and ts <= 71"),
- ElementsAre(71));
}
} // namespace
diff --git a/src/trace_processor/sched_tracker_unittest.cc b/src/trace_processor/sched_tracker_unittest.cc
index dbd94e6..a8a70c3 100644
--- a/src/trace_processor/sched_tracker_unittest.cc
+++ b/src/trace_processor/sched_tracker_unittest.cc
@@ -49,7 +49,7 @@
static const char kCommProc2[] = "process2";
uint32_t pid_2 = 4;
- const auto& timestamps = context.storage->SlicesForCpu(cpu).start_ns();
+ const auto& timestamps = context.storage->slices().start_ns();
context.sched_tracker->PushSchedSwitch(cpu, timestamp, pid_1, prev_state,
kCommProc1, pid_2);
ASSERT_EQ(timestamps.size(), 0);
@@ -63,7 +63,7 @@
ASSERT_EQ(std::string(context.storage->GetString(
context.storage->GetThread(1).name_id)),
kCommProc2);
- ASSERT_EQ(context.storage->SlicesForCpu(cpu).utids().front(), 1);
+ ASSERT_EQ(context.storage->slices().utids().front(), 1);
}
TEST_F(SchedTrackerTest, InsertThirdSched_SameThread) {
@@ -73,7 +73,7 @@
static const char kCommProc1[] = "process1";
static const char kCommProc2[] = "process2";
- const auto& timestamps = context.storage->SlicesForCpu(cpu).start_ns();
+ const auto& timestamps = context.storage->slices().start_ns();
context.sched_tracker->PushSchedSwitch(cpu, timestamp, /*tid=*/4, prev_state,
kCommProc1,
/*tid=*/2);
@@ -92,11 +92,11 @@
ASSERT_EQ(timestamps.size(), 3ul);
ASSERT_EQ(timestamps[0], timestamp);
ASSERT_EQ(context.storage->GetThread(1).start_ns, timestamp);
- ASSERT_EQ(context.storage->SlicesForCpu(cpu).durations().at(0), 1u);
- ASSERT_EQ(context.storage->SlicesForCpu(cpu).durations().at(1), 11u - 1u);
- ASSERT_EQ(context.storage->SlicesForCpu(cpu).durations().at(2), 31u - 11u);
- ASSERT_EQ(context.storage->SlicesForCpu(cpu).utids().at(0),
- context.storage->SlicesForCpu(cpu).utids().at(2));
+ ASSERT_EQ(context.storage->slices().durations().at(0), 1u);
+ ASSERT_EQ(context.storage->slices().durations().at(1), 11u - 1u);
+ ASSERT_EQ(context.storage->slices().durations().at(2), 31u - 11u);
+ ASSERT_EQ(context.storage->slices().utids().at(0),
+ context.storage->slices().utids().at(2));
}
TEST_F(SchedTrackerTest, CounterDuration) {
diff --git a/src/trace_processor/span_operator_table.cc b/src/trace_processor/span_operator_table.cc
new file mode 100644
index 0000000..55ee520
--- /dev/null
+++ b/src/trace_processor/span_operator_table.cc
@@ -0,0 +1,463 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/span_operator_table.h"
+
+#include <sqlite3.h>
+#include <string.h>
+#include <algorithm>
+#include <set>
+
+#include "perfetto/base/logging.h"
+#include "src/trace_processor/sqlite_utils.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+
+using namespace sqlite_utils;
+
+constexpr uint64_t kU64Max = std::numeric_limits<uint64_t>::max();
+
+std::vector<SpanOperatorTable::ColumnDefinition> GetColumnsForTable(
+ sqlite3* db,
+ const std::string& raw_table_name) {
+ char sql[1024];
+ const char kRawSql[] = "SELECT name, type from pragma_table_info(\"%s\")";
+
+ // Support names which are table valued functions with arguments.
+ std::string table_name = raw_table_name.substr(0, raw_table_name.find('('));
+ int n = snprintf(sql, sizeof(sql), kRawSql, table_name.c_str());
+ PERFETTO_DCHECK(n >= 0 || static_cast<size_t>(n) < sizeof(sql));
+
+ sqlite3_stmt* raw_stmt = nullptr;
+ int err = sqlite3_prepare_v2(db, sql, n, &raw_stmt, nullptr);
+
+ ScopedStmt stmt(raw_stmt);
+ PERFETTO_DCHECK(sqlite3_column_count(*stmt) == 2);
+
+ std::vector<SpanOperatorTable::ColumnDefinition> columns;
+ while (true) {
+ err = sqlite3_step(raw_stmt);
+ if (err == SQLITE_DONE)
+ break;
+ if (err != SQLITE_ROW) {
+ PERFETTO_ELOG("Querying schema of table failed");
+ return {};
+ }
+
+ const char* name =
+ reinterpret_cast<const char*>(sqlite3_column_text(*stmt, 0));
+ const char* type =
+ reinterpret_cast<const char*>(sqlite3_column_text(*stmt, 1));
+ if (!name || !type || !*name || !*type) {
+ PERFETTO_ELOG("Schema has invalid column values");
+ return {};
+ }
+
+ SpanOperatorTable::ColumnDefinition column;
+ column.name = name;
+ column.type_name = type;
+
+ std::transform(column.type_name.begin(), column.type_name.end(),
+ column.type_name.begin(), ::toupper);
+ if (column.type_name == "UNSIGNED BIG INT") {
+ column.type = SpanOperatorTable::Value::Type::kULong;
+ } else if (column.type_name == "UNSIGNED INT") {
+ column.type = SpanOperatorTable::Value::Type::kUInt;
+ } else if (column.type_name == "TEXT") {
+ column.type = SpanOperatorTable::Value::Type::kText;
+ } else {
+ PERFETTO_FATAL("Unknown column type on table %s", raw_table_name.c_str());
+ }
+ columns.emplace_back(column);
+ }
+ return columns;
+}
+
+} // namespace
+
+SpanOperatorTable::SpanOperatorTable(sqlite3* db, const TraceStorage*)
+ : db_(db) {}
+
+void SpanOperatorTable::RegisterTable(sqlite3* db,
+ const TraceStorage* storage) {
+ Table::Register<SpanOperatorTable>(db, storage, "span");
+}
+
+std::string SpanOperatorTable::CreateTableStmt(int argc,
+ const char* const* argv) {
+ // argv[0] - argv[2] are SQLite populated fields which are always present.
+ if (argc < 6) {
+ PERFETTO_ELOG("SPAN JOIN expected at least 3 args, received %d", argc - 3);
+ return "";
+ }
+
+ // The order arguments is (t1_name, t2_name, join_col).
+ t1_defn_.name = reinterpret_cast<const char*>(argv[3]);
+ t1_defn_.cols = GetColumnsForTable(db_, t1_defn_.name);
+
+ t2_defn_.name = reinterpret_cast<const char*>(argv[4]);
+ t2_defn_.cols = GetColumnsForTable(db_, t2_defn_.name);
+
+ join_col_ = reinterpret_cast<const char*>(argv[5]);
+
+ // TODO(lalitm): add logic to ensure that the tables that are being joined
+ // are actually valid to be joined i.e. they have the ts and dur columns and
+ // have the join column.
+
+ auto filter_fn = [this](const ColumnDefinition& it) {
+ return it.name == "ts" || it.name == "dur" || it.name == join_col_;
+ };
+ auto t1_remove_it =
+ std::remove_if(t1_defn_.cols.begin(), t1_defn_.cols.end(), filter_fn);
+ t1_defn_.cols.erase(t1_remove_it, t1_defn_.cols.end());
+ auto t2_remove_it =
+ std::remove_if(t2_defn_.cols.begin(), t2_defn_.cols.end(), filter_fn);
+ t2_defn_.cols.erase(t2_remove_it, t2_defn_.cols.end());
+
+ // Create the statement as the combination of the unique columns of the two
+ // tables.
+ std::string create_stmt;
+ create_stmt +=
+ "CREATE TABLE x("
+ "ts UNSIGNED BIG INT, "
+ "dur UNSIGNED BIG INT, ";
+ create_stmt += join_col_ + " UNSIGNED INT, ";
+ for (const auto& col : t1_defn_.cols) {
+ create_stmt += col.name + " " + col.type_name + ", ";
+ }
+ for (const auto& col : t2_defn_.cols) {
+ create_stmt += col.name + " " + col.type_name + ", ";
+ }
+ create_stmt += "PRIMARY KEY(ts, " + join_col_ + ")) WITHOUT ROWID;";
+ PERFETTO_DLOG("Create statement: %s", create_stmt.c_str());
+ return create_stmt;
+}
+
+std::unique_ptr<Table::Cursor> SpanOperatorTable::CreateCursor() {
+ return std::unique_ptr<SpanOperatorTable::Cursor>(
+ new SpanOperatorTable::Cursor(this, db_));
+}
+
+int SpanOperatorTable::BestIndex(const QueryConstraints&, BestIndexInfo*) {
+ // TODO(lalitm): figure out cost estimation.
+ return SQLITE_OK;
+}
+
+std::pair<bool, size_t> SpanOperatorTable::GetTableAndColumnIndex(
+ int joined_column_idx) {
+ PERFETTO_CHECK(joined_column_idx >= kReservedColumns);
+
+ size_t table_1_col =
+ static_cast<size_t>(joined_column_idx - kReservedColumns);
+ if (table_1_col < t1_defn_.cols.size()) {
+ return std::make_pair(true, table_1_col);
+ }
+ size_t table_2_col = table_1_col - t1_defn_.cols.size();
+ PERFETTO_CHECK(table_2_col < t2_defn_.cols.size());
+ return std::make_pair(false, table_2_col);
+}
+
+SpanOperatorTable::Cursor::Cursor(SpanOperatorTable* table, sqlite3* db)
+ : db_(db), table_(table) {}
+
+SpanOperatorTable::Cursor::~Cursor() {}
+
+int SpanOperatorTable::Cursor::PrepareRawStmt(const QueryConstraints& qc,
+ sqlite3_value** argv,
+ const TableDefinition& def,
+ bool is_t1,
+ sqlite3_stmt** stmt) {
+ // TODO(lalitm): pass through constraints on other tables to those tables.
+ std::string sql;
+ sql += "SELECT ts, dur, " + table_->join_col_;
+ for (const auto& col : def.cols) {
+ sql += ", " + col.name;
+ }
+ sql += " FROM " + def.name;
+ sql += " WHERE 1";
+
+ for (size_t i = 0; i < qc.constraints().size(); i++) {
+ const auto& constraint = qc.constraints()[i];
+ int c = constraint.iColumn;
+ std::string col_name;
+ if (c == Column::kTimestamp) {
+ col_name = "ts";
+ } else if (c == Column::kDuration) {
+ col_name = "dur";
+ } else if (c == Column::kJoinValue) {
+ col_name = table_->join_col_;
+ } else {
+ auto index_pair = table_->GetTableAndColumnIndex(c);
+ bool is_constraint_in_current_table = index_pair.first == is_t1;
+ if (is_constraint_in_current_table) {
+ col_name = def.cols[index_pair.second].name;
+ }
+ }
+
+ if (!col_name.empty()) {
+ sql += " AND " + col_name + OpToString(constraint.op) +
+ reinterpret_cast<const char*>(sqlite3_value_text(argv[i]));
+ }
+ }
+ sql += " ORDER BY ts;";
+
+ PERFETTO_DLOG("%s", sql.c_str());
+ int t1_size = static_cast<int>(sql.size());
+ return sqlite3_prepare_v2(db_, sql.c_str(), t1_size, stmt, nullptr);
+}
+
+int SpanOperatorTable::Cursor::Filter(const QueryConstraints& qc,
+ sqlite3_value** argv) {
+ sqlite3_stmt* t1_raw = nullptr;
+ int err = PrepareRawStmt(qc, argv, table_->t1_defn_, true, &t1_raw);
+ ScopedStmt t1_stmt(t1_raw);
+ if (err != SQLITE_OK)
+ return err;
+
+ sqlite3_stmt* t2_raw = nullptr;
+ err = PrepareRawStmt(qc, argv, table_->t2_defn_, false, &t2_raw);
+ ScopedStmt t2_stmt(t2_raw);
+ if (err != SQLITE_OK)
+ return err;
+
+ filter_state_.reset(
+ new FilterState(table_, std::move(t1_stmt), std::move(t2_stmt)));
+ return filter_state_->Initialize();
+}
+
+int SpanOperatorTable::Cursor::Next() {
+ return filter_state_->Next();
+}
+
+int SpanOperatorTable::Cursor::Eof() {
+ return filter_state_->Eof();
+}
+
+int SpanOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
+ return filter_state_->Column(context, N);
+}
+
+SpanOperatorTable::FilterState::FilterState(SpanOperatorTable* table,
+ ScopedStmt t1_stmt,
+ ScopedStmt t2_stmt)
+ : table_(table) {
+ t1_.stmt = std::move(t1_stmt);
+ t2_.stmt = std::move(t2_stmt);
+}
+
+int SpanOperatorTable::FilterState::Initialize() {
+ int err = sqlite3_step(t1_.stmt.get());
+ if (err != SQLITE_DONE) {
+ if (err != SQLITE_ROW)
+ return SQLITE_ERROR;
+ int64_t ts = sqlite3_column_int64(t1_.stmt.get(), Column::kTimestamp);
+ t1_.latest_ts = static_cast<uint64_t>(ts);
+ t1_.col_count = static_cast<size_t>(sqlite3_column_count(t1_.stmt.get()));
+ }
+
+ err = sqlite3_step(t2_.stmt.get());
+ if (err != SQLITE_DONE) {
+ if (err != SQLITE_ROW)
+ return SQLITE_ERROR;
+ int64_t ts = sqlite3_column_int64(t2_.stmt.get(), Column::kTimestamp);
+ t2_.latest_ts = static_cast<uint64_t>(ts);
+ t2_.col_count = static_cast<size_t>(sqlite3_column_count(t2_.stmt.get()));
+ }
+ return Next();
+}
+
+int SpanOperatorTable::FilterState::Next() {
+ PERFETTO_DCHECK(!intersecting_spans_.empty() || children_have_more_);
+
+ // If there are no more rows to be added from the child tables, simply pop the
+ // the front of the queue and return.
+ if (!children_have_more_) {
+ intersecting_spans_.pop_front();
+ return SQLITE_OK;
+ }
+
+ // Remove the previously returned span but also try and find more
+ // intersections.
+ if (!intersecting_spans_.empty())
+ intersecting_spans_.pop_front();
+
+ // Pull from whichever cursor has the earlier timestamp and return if there
+ // is a valid span.
+ while (t1_.latest_ts < kU64Max || t2_.latest_ts < kU64Max) {
+ int err = ExtractNext(t1_.latest_ts <= t2_.latest_ts);
+ if (err == SQLITE_ROW) {
+ return SQLITE_OK;
+ } else if (err != SQLITE_DONE) {
+ return err;
+ }
+ }
+
+ // Once both cursors are completely exhausted, do one last pass through the
+ // tables and return any final intersecting spans.
+ for (auto it = t1_.spans.begin(); it != t1_.spans.end(); it++) {
+ auto join_val = it->first;
+ auto t2_it = t2_.spans.find(join_val);
+ if (t2_it == t2_.spans.end())
+ continue;
+ MaybeAddIntersectingSpan(join_val, std::move(it->second),
+ std::move(t2_it->second));
+ }
+
+ // We don't have any more items to yield.
+ children_have_more_ = false;
+ return SQLITE_OK;
+}
+
+PERFETTO_ALWAYS_INLINE int SpanOperatorTable::FilterState::ExtractNext(
+ bool pull_t1) {
+ // Decide which table we will be retrieving a row from.
+ TableState* pull_table = pull_t1 ? &t1_ : &t2_;
+
+ // Extract the timestamp, duration and join value from that table.
+ sqlite3_stmt* stmt = pull_table->stmt.get();
+ int64_t ts = sqlite3_column_int64(stmt, Column::kTimestamp);
+ int64_t dur = sqlite3_column_int64(stmt, Column::kDuration);
+ int64_t join_val = sqlite3_column_int64(stmt, Column::kJoinValue);
+
+ // Extract the actual row from the state.
+ auto* pull_span = &pull_table->spans[join_val];
+
+ // Save the old span (to allow us to return it) and then update the data in
+ // the span.
+ Span saved_span = std::move(*pull_span);
+ pull_span->ts = static_cast<uint64_t>(ts);
+ pull_span->dur = static_cast<uint64_t>(dur);
+ pull_span->values.resize(pull_table->col_count - kReservedColumns);
+
+ // Update all other columns.
+ const auto& table_desc = pull_t1 ? table_->t1_defn_ : table_->t2_defn_;
+ int col_count = static_cast<int>(pull_table->col_count);
+ for (int i = kReservedColumns; i < col_count; i++) {
+ size_t off = static_cast<size_t>(i - kReservedColumns);
+
+ Value* value = &pull_span->values[off];
+ value->type = table_desc.cols[off].type;
+ switch (value->type) {
+ case Value::Type::kULong:
+ value->ulong_value =
+ static_cast<uint64_t>(sqlite3_column_int64(stmt, i));
+ break;
+ case Value::Type::kUInt:
+ value->uint_value = static_cast<uint32_t>(sqlite3_column_int(stmt, i));
+ break;
+ case Value::Type::kText:
+ value->text_value =
+ reinterpret_cast<const char*>(sqlite3_column_text(stmt, i));
+ break;
+ }
+ }
+
+ // Get the next value from whichever table we just updated.
+ int err = sqlite3_step(stmt);
+ switch (err) {
+ case SQLITE_DONE:
+ pull_table->latest_ts = kU64Max;
+ break;
+ case SQLITE_ROW:
+ pull_table->latest_ts =
+ static_cast<uint64_t>(sqlite3_column_int64(stmt, Column::kTimestamp));
+ break;
+ default:
+ return err;
+ }
+
+ // Create copies of the spans we want to intersect then perform the intersect.
+ auto t1_span = pull_t1 ? std::move(saved_span) : t1_.spans[join_val];
+ auto t2_span = pull_t1 ? t2_.spans[join_val] : std::move(saved_span);
+ bool span_added = MaybeAddIntersectingSpan(join_val, t1_span, t2_span);
+ return span_added ? SQLITE_ROW : SQLITE_DONE;
+}
+
+bool SpanOperatorTable::FilterState::MaybeAddIntersectingSpan(
+ int64_t join_value,
+ Span t1_span,
+ Span t2_span) {
+ uint64_t t1_end = t1_span.ts + t1_span.dur;
+ uint64_t t2_end = t2_span.ts + t2_span.dur;
+
+ // If there is no overlap between the two spans, don't return anything.
+ if (t1_end == 0 || t2_end == 0 || t2_end < t1_span.ts || t1_end < t2_span.ts)
+ return false;
+
+ IntersectingSpan value;
+ value.ts = std::max(t1_span.ts, t2_span.ts);
+ value.dur = std::min(t1_end, t2_end) - value.ts;
+ value.join_val = join_value;
+ value.t1_span = std::move(t1_span);
+ value.t2_span = std::move(t2_span);
+ intersecting_spans_.emplace_back(std::move(value));
+
+ return true;
+}
+
+int SpanOperatorTable::FilterState::Eof() {
+ return intersecting_spans_.empty() && !children_have_more_;
+}
+
+int SpanOperatorTable::FilterState::Column(sqlite3_context* context, int N) {
+ const auto& ret = intersecting_spans_.front();
+ switch (N) {
+ case Column::kTimestamp:
+ sqlite3_result_int64(context, static_cast<sqlite3_int64>(ret.ts));
+ break;
+ case Column::kDuration:
+ sqlite3_result_int64(context, static_cast<sqlite3_int64>(ret.dur));
+ break;
+ case Column::kJoinValue:
+ sqlite3_result_int64(context, static_cast<sqlite3_int64>(ret.join_val));
+ break;
+ default: {
+ auto index_pair = table_->GetTableAndColumnIndex(N);
+ const auto& row = index_pair.first ? ret.t1_span : ret.t2_span;
+ ReportSqliteResult(context, row.values[index_pair.second]);
+ }
+ }
+ return SQLITE_OK;
+}
+
+PERFETTO_ALWAYS_INLINE void SpanOperatorTable::FilterState::ReportSqliteResult(
+ sqlite3_context* context,
+ SpanOperatorTable::Value value) {
+ switch (value.type) {
+ case Value::Type::kUInt:
+ sqlite3_result_int(context, static_cast<int>(value.uint_value));
+ break;
+ case Value::Type::kULong:
+ sqlite3_result_int64(context,
+ static_cast<sqlite3_int64>(value.ulong_value));
+ break;
+ case Value::Type::kText:
+ // Note: If you could guarantee that you never sqlite3_step() the cursor
+ // before accessing the values here, you could avoid string copies and
+ // pass through the const char* obtained in ExtractNext
+ const auto kSqliteTransient =
+ reinterpret_cast<sqlite3_destructor_type>(-1);
+ sqlite3_result_text(context, value.text_value.c_str(), -1,
+ kSqliteTransient);
+ break;
+ }
+}
+
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/span_operator_table.h b/src/trace_processor/span_operator_table.h
new file mode 100644
index 0000000..24d21ac
--- /dev/null
+++ b/src/trace_processor/span_operator_table.h
@@ -0,0 +1,216 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_SPAN_OPERATOR_TABLE_H_
+#define SRC_TRACE_PROCESSOR_SPAN_OPERATOR_TABLE_H_
+
+#include <sqlite3.h>
+#include <array>
+#include <deque>
+#include <limits>
+#include <map>
+#include <memory>
+
+#include "src/trace_processor/scoped_db.h"
+#include "src/trace_processor/table.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// Implements the SPAN JOIN operation between two tables on a particular column.
+//
+// Span:
+// A span is a row with a timestamp and a duration. It can is used to model
+// operations which run for a particular *span* of time.
+//
+// We draw spans like so (time on the x-axis):
+// start of span->[ time where opertion is running ]<- end of span
+//
+// Multiple spans can happen in parallel:
+// [ ]
+// [ ]
+// [ ]
+// [ ]
+//
+// The above for example, models scheduling activity on a 4-core computer for a
+// short period of time.
+//
+// Span join:
+// The span join operation can be thought of as the intersection of span tables.
+// That is, the join table has a span for each pair of spans in the child tables
+// where the spans overlap. Because many spans are possible in parallel, an
+// extra metadata column (labelled the "join column") is used to distinguish
+// between the spanned tables.
+//
+// For a given join key suppose these were the two span tables:
+// Table 1: [ ] [ ] [ ]
+// Table 2: [ ] [ ] [ ]
+// Output : [ ] [ ] []
+//
+// All other columns apart from timestamp (ts), duration (dur) and the join key
+// are passed through unchanged.
+class SpanOperatorTable : public Table {
+ public:
+ // Columns of the span operator table.
+ enum Column {
+ kTimestamp = 0,
+ kDuration = 1,
+ kJoinValue = 2,
+ // All other columns are dynamic depending on the joined tables.
+ };
+
+ // Represents possible values of a SQLite joined table.
+ struct Value {
+ enum Type {
+ kText = 0,
+ kULong = 1,
+ kUInt = 2,
+ };
+
+ Type type;
+ std::string text_value;
+ uint64_t ulong_value;
+ uint32_t uint_value;
+ };
+
+ // Stores the definition of a column
+ struct ColumnDefinition {
+ std::string name;
+ std::string type_name;
+ Value::Type type = Value::Type::kText;
+ };
+
+ SpanOperatorTable(sqlite3*, const TraceStorage*);
+
+ static void RegisterTable(sqlite3* db, const TraceStorage* storage);
+
+ // Table implementation.
+ std::string CreateTableStmt(int argc, const char* const* argv) override;
+ std::unique_ptr<Table::Cursor> CreateCursor() override;
+ int BestIndex(const QueryConstraints& qc, BestIndexInfo* info) override;
+
+ private:
+ static constexpr uint8_t kReservedColumns = Column::kJoinValue + 1;
+
+ // Contains the definition of the child tables.
+ struct TableDefinition {
+ std::string name;
+ std::vector<ColumnDefinition> cols;
+ std::string join_col_name;
+ };
+
+ // State used when filtering on the span table.
+ class FilterState {
+ public:
+ FilterState(SpanOperatorTable*, ScopedStmt t1_stmt, ScopedStmt t2_stmt);
+
+ int Initialize();
+ int Next();
+ int Eof();
+ int Column(sqlite3_context* context, int N);
+
+ private:
+ // Details of a row of one of the child tables.
+ struct Span {
+ uint64_t ts = 0;
+ uint64_t dur = 0;
+ std::vector<Value> values; // One for each column.
+ };
+
+ // Details of the state of retrieval from a table across all join values.
+ struct TableState {
+ uint64_t latest_ts = std::numeric_limits<uint64_t>::max();
+ size_t col_count = 0;
+ ScopedStmt stmt;
+
+ // The rows of the table indexed by the values of join column.
+ // TODO(lalitm): see how we can expand this past int64_t.
+ std::map<int64_t, Span> spans;
+ };
+
+ // A span which has data from both tables associated with it.
+ struct IntersectingSpan {
+ uint64_t ts = 0;
+ uint64_t dur = 0;
+ int64_t join_val = 0;
+ Span t1_span;
+ Span t2_span;
+ };
+
+ // Computes the next value from the child tables.
+ int ExtractNext(bool pull_t1);
+
+ // Add an intersecting span to the queue if the two child spans intersect
+ // at any point in time.
+ bool MaybeAddIntersectingSpan(int64_t join_value,
+ Span t1_span,
+ Span t2_span);
+
+ // Reports to SQLite the value given by |value| based on its type.
+ void ReportSqliteResult(sqlite3_context* context,
+ SpanOperatorTable::Value value);
+
+ TableState t1_;
+ TableState t2_;
+
+ bool children_have_more_ = true;
+ std::deque<IntersectingSpan> intersecting_spans_;
+
+ SpanOperatorTable* const table_;
+ };
+
+ // Cursor on the span table.
+ class Cursor : public Table::Cursor {
+ public:
+ Cursor(SpanOperatorTable*, sqlite3* db);
+ ~Cursor() override;
+
+ // Methods to be implemented by derived table classes.
+ int Filter(const QueryConstraints& qc, sqlite3_value** argv) override;
+ int Next() override;
+ int Eof() override;
+ int Column(sqlite3_context* context, int N) override;
+
+ private:
+ int PrepareRawStmt(const QueryConstraints& qc,
+ sqlite3_value** argv,
+ const TableDefinition& def,
+ bool is_t1,
+ sqlite3_stmt**);
+
+ sqlite3* const db_;
+ SpanOperatorTable* const table_;
+ std::unique_ptr<FilterState> filter_state_;
+ };
+
+ // Converts a joined column index into an index on the columns of the child
+ // tables.
+ // Returns a (bool, index) pair with the bool indicating whether the index is
+ // into table 1 and the index being the offset into the relevant table's
+ // columns.
+ std::pair<bool, size_t> GetTableAndColumnIndex(int joined_column_idx);
+
+ TableDefinition t1_defn_;
+ TableDefinition t2_defn_;
+ std::string join_col_;
+
+ sqlite3* const db_;
+};
+
+} // namespace trace_processor
+} // namespace perfetto
+
+#endif // SRC_TRACE_PROCESSOR_SPAN_OPERATOR_TABLE_H_
diff --git a/src/trace_processor/span_operator_table_unittest.cc b/src/trace_processor/span_operator_table_unittest.cc
new file mode 100644
index 0000000..f262456
--- /dev/null
+++ b/src/trace_processor/span_operator_table_unittest.cc
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/span_operator_table.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "src/trace_processor/trace_processor_context.h"
+#include "src/trace_processor/trace_storage.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+class SpanOperatorTableTest : public ::testing::Test {
+ public:
+ SpanOperatorTableTest() {
+ sqlite3* db = nullptr;
+ PERFETTO_CHECK(sqlite3_open(":memory:", &db) == SQLITE_OK);
+ db_.reset(db);
+
+ context_.storage.reset(new TraceStorage());
+
+ SpanOperatorTable::RegisterTable(db_.get(), context_.storage.get());
+ }
+
+ void PrepareValidStatement(const std::string& sql) {
+ int size = static_cast<int>(sql.size());
+ sqlite3_stmt* stmt;
+ ASSERT_EQ(sqlite3_prepare_v2(*db_, sql.c_str(), size, &stmt, nullptr),
+ SQLITE_OK);
+ stmt_.reset(stmt);
+ }
+
+ void RunStatement(const std::string& sql) {
+ PrepareValidStatement(sql);
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_DONE);
+ }
+
+ ~SpanOperatorTableTest() override { context_.storage->ResetStorage(); }
+
+ protected:
+ TraceProcessorContext context_;
+ ScopedDb db_;
+ ScopedStmt stmt_;
+};
+
+TEST_F(SpanOperatorTableTest, JoinTwoSpanTables) {
+ RunStatement(
+ "CREATE TEMP TABLE f("
+ "ts UNSIGNED BIG INT PRIMARY KEY, "
+ "dur UNSIGNED BIG INT, "
+ "cpu UNSIGNED INT"
+ ");");
+ RunStatement(
+ "CREATE TEMP TABLE s("
+ "ts UNSIGNED BIG INT PRIMARY KEY, "
+ "dur UNSIGNED BIG INT, "
+ "cpu UNSIGNED INT"
+ ");");
+ RunStatement("CREATE VIRTUAL TABLE sp USING span(f, s, cpu);");
+
+ RunStatement("INSERT INTO f VALUES(100, 10, 5);");
+ RunStatement("INSERT INTO f VALUES(110, 50, 5);");
+ RunStatement("INSERT INTO f VALUES(120, 100, 2);");
+ RunStatement("INSERT INTO f VALUES(160, 10, 5);");
+
+ RunStatement("INSERT INTO s VALUES(100, 5, 5);");
+ RunStatement("INSERT INTO s VALUES(105, 100, 5);");
+ RunStatement("INSERT INTO s VALUES(110, 50, 2);");
+ RunStatement("INSERT INTO s VALUES(160, 100, 2);");
+
+ PrepareValidStatement("SELECT * FROM sp");
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 100);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 5);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 5);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 105);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 5);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 5);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 110);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 50);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 5);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 120);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 40);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 2);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 160);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 60);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 2);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_ROW);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 0), 160);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 1), 10);
+ ASSERT_EQ(sqlite3_column_int64(stmt_.get(), 2), 5);
+
+ ASSERT_EQ(sqlite3_step(stmt_.get()), SQLITE_DONE);
+}
+
+} // namespace
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/sqlite_utils.h b/src/trace_processor/sqlite_utils.h
index 5a8b8fb..b1e0499 100644
--- a/src/trace_processor/sqlite_utils.h
+++ b/src/trace_processor/sqlite_utils.h
@@ -43,6 +43,25 @@
return op == SQLITE_INDEX_CONSTRAINT_LT;
}
+inline std::string OpToString(int op) {
+ switch (op) {
+ case SQLITE_INDEX_CONSTRAINT_EQ:
+ return "=";
+ case SQLITE_INDEX_CONSTRAINT_NE:
+ return "!=";
+ case SQLITE_INDEX_CONSTRAINT_GE:
+ return ">=";
+ case SQLITE_INDEX_CONSTRAINT_GT:
+ return ">";
+ case SQLITE_INDEX_CONSTRAINT_LE:
+ return "<=";
+ case SQLITE_INDEX_CONSTRAINT_LT:
+ return "<";
+ default:
+ PERFETTO_FATAL("Operator to string conversion not impemented for %d", op);
+ }
+}
+
} // namespace sqlite_utils
} // namespace trace_processor
} // namespace perfetto
diff --git a/src/trace_processor/table.cc b/src/trace_processor/table.cc
index 75eac08..ac0e4b8 100644
--- a/src/trace_processor/table.cc
+++ b/src/trace_processor/table.cc
@@ -52,6 +52,7 @@
void Table::RegisterInternal(sqlite3* db,
const TraceStorage* storage,
const std::string& table_name,
+ bool read_write,
Factory factory) {
std::unique_ptr<TableDescriptor> desc(new TableDescriptor());
desc->storage = storage;
@@ -60,8 +61,8 @@
sqlite3_module* module = &desc->module;
memset(module, 0, sizeof(*module));
- module->xConnect = [](sqlite3* xdb, void* arg, int argc,
- const char* const* argv, sqlite3_vtab** tab, char**) {
+ auto create_fn = [](sqlite3* xdb, void* arg, int argc,
+ const char* const* argv, sqlite3_vtab** tab, char**) {
const TableDescriptor* xdesc = static_cast<const TableDescriptor*>(arg);
auto table = xdesc->factory(xdb, xdesc->storage);
@@ -79,11 +80,15 @@
return SQLITE_OK;
};
+ module->xCreate = create_fn;
+ module->xConnect = create_fn;
- module->xDisconnect = [](sqlite3_vtab* t) {
+ auto destroy_fn = [](sqlite3_vtab* t) {
delete ToTable(t);
return SQLITE_OK;
};
+ module->xDisconnect = destroy_fn;
+ module->xDestroy = destroy_fn;
module->xOpen = [](sqlite3_vtab* t, sqlite3_vtab_cursor** c) {
return ToTable(t)->OpenInternal(c);
@@ -108,8 +113,8 @@
return ToCursor(c)->Column(a, b);
};
- module->xRowid = [](sqlite3_vtab_cursor*, sqlite_int64*) {
- return SQLITE_ERROR;
+ module->xRowid = [](sqlite3_vtab_cursor* c, sqlite3_int64* r) {
+ return ToCursor(c)->RowId(r);
};
module->xFindFunction =
@@ -117,6 +122,13 @@
void (**fn)(sqlite3_context*, int, sqlite3_value**),
void** args) { return ToTable(t)->FindFunction(name, fn, args); };
+ if (read_write) {
+ module->xUpdate = [](sqlite3_vtab* t, int a, sqlite3_value** v,
+ sqlite3_int64* r) {
+ return ToTable(t)->Update(a, v, r);
+ };
+ }
+
int res = sqlite3_create_module_v2(
db, table_name.c_str(), module, desc.release(),
[](void* arg) { delete static_cast<TableDescriptor*>(arg); });
@@ -186,10 +198,18 @@
int Table::FindFunction(const char*, FindFunctionFn, void**) {
return 0;
-};
+}
+
+int Table::Update(int, sqlite3_value**, sqlite3_int64*) {
+ return SQLITE_READONLY;
+}
Table::Cursor::~Cursor() = default;
+int Table::Cursor::RowId(sqlite3_int64*) {
+ return SQLITE_ERROR;
+}
+
int Table::Cursor::FilterInternal(int idxNum,
const char* idxStr,
int argc,
diff --git a/src/trace_processor/table.h b/src/trace_processor/table.h
index f5da6f3..26992f9 100644
--- a/src/trace_processor/table.h
+++ b/src/trace_processor/table.h
@@ -57,6 +57,9 @@
virtual int Eof() = 0;
virtual int Column(sqlite3_context* context, int N) = 0;
+ // Optional methods to implement.
+ virtual int RowId(sqlite3_int64*);
+
private:
friend class Table;
@@ -79,8 +82,9 @@
template <typename T>
static void Register(sqlite3* db,
const TraceStorage* storage,
- const std::string& name) {
- RegisterInternal(db, storage, name, GetFactory<T>());
+ const std::string& name,
+ bool read_write = false) {
+ RegisterInternal(db, storage, name, read_write, GetFactory<T>());
}
// Methods to be implemented by derived table classes.
@@ -92,6 +96,9 @@
using FindFunctionFn = void (**)(sqlite3_context*, int, sqlite3_value**);
virtual int FindFunction(const char* name, FindFunctionFn fn, void** args);
+ // At registration time, the function should also pass true for |read_write|.
+ virtual int Update(int, sqlite3_value**, sqlite3_int64*);
+
private:
template <typename TableType>
static Factory GetFactory() {
@@ -103,6 +110,7 @@
static void RegisterInternal(sqlite3* db,
const TraceStorage*,
const std::string& name,
+ bool read_write,
Factory);
// Overriden functions from sqlite3_vtab.
diff --git a/src/trace_processor/trace_processor.cc b/src/trace_processor/trace_processor.cc
index 89ff3c8..61c1b80 100644
--- a/src/trace_processor/trace_processor.cc
+++ b/src/trace_processor/trace_processor.cc
@@ -29,10 +29,12 @@
#include "src/trace_processor/sched_tracker.h"
#include "src/trace_processor/slice_table.h"
#include "src/trace_processor/slice_tracker.h"
+#include "src/trace_processor/span_operator_table.h"
#include "src/trace_processor/string_table.h"
#include "src/trace_processor/table.h"
#include "src/trace_processor/thread_table.h"
#include "src/trace_processor/trace_sorter.h"
+#include "src/trace_processor/window_operator_table.h"
#include "perfetto/trace_processor/raw_query.pb.h"
@@ -58,6 +60,8 @@
StringTable::RegisterTable(*db_, context_.storage.get());
ThreadTable::RegisterTable(*db_, context_.storage.get());
CountersTable::RegisterTable(*db_, context_.storage.get());
+ SpanOperatorTable::RegisterTable(*db_, context_.storage.get());
+ WindowOperatorTable::RegisterTable(*db_, context_.storage.get());
}
TraceProcessor::~TraceProcessor() = default;
diff --git a/src/trace_processor/trace_storage.cc b/src/trace_processor/trace_storage.cc
index 43c5944..4443af0 100644
--- a/src/trace_processor/trace_storage.cc
+++ b/src/trace_processor/trace_storage.cc
@@ -37,8 +37,8 @@
uint64_t start_ns,
uint64_t duration_ns,
UniqueTid utid) {
- cpu_events_[cpu].AddSlice(start_ns, duration_ns, utid);
-};
+ slices_.AddSlice(cpu, start_ns, duration_ns, utid);
+}
StringId TraceStorage::InternString(base::StringView str) {
auto hash = str.Hash();
diff --git a/src/trace_processor/trace_storage.h b/src/trace_processor/trace_storage.h
index 47ccbf7..fb4e92c 100644
--- a/src/trace_processor/trace_storage.h
+++ b/src/trace_processor/trace_storage.h
@@ -78,11 +78,13 @@
uint32_t tid = 0;
};
- class SlicesPerCpu {
+ class Slices {
public:
- inline void AddSlice(uint64_t start_ns,
+ inline void AddSlice(uint32_t cpu,
+ uint64_t start_ns,
uint64_t duration_ns,
UniqueTid utid) {
+ cpus_.emplace_back(cpu);
start_ns_.emplace_back(start_ns);
durations_.emplace_back(duration_ns);
utids_.emplace_back(utid);
@@ -90,6 +92,8 @@
size_t slice_count() const { return start_ns_.size(); }
+ const std::deque<uint32_t>& cpus() const { return cpus_; }
+
const std::deque<uint64_t>& start_ns() const { return start_ns_; }
const std::deque<uint64_t>& durations() const { return durations_; }
@@ -99,6 +103,7 @@
private:
// Each deque below has the same number of entries (the number of slices
// in the trace for the CPU).
+ std::deque<uint32_t> cpus_;
std::deque<uint64_t> start_ns_;
std::deque<uint64_t> durations_;
std::deque<UniqueTid> utids_;
@@ -225,11 +230,6 @@
}
// Reading methods.
- const SlicesPerCpu& SlicesForCpu(uint32_t cpu) const {
- PERFETTO_DCHECK(cpu < cpu_events_.size());
- return cpu_events_[cpu];
- }
-
const std::string& GetString(StringId id) const {
PERFETTO_DCHECK(id < string_pool_.size());
return string_pool_[id];
@@ -246,6 +246,7 @@
return unique_threads_[utid];
}
+ const Slices& slices() const { return slices_; }
const NestableSlices& nestable_slices() const { return nestable_slices_; }
NestableSlices* mutable_nestable_slices() { return &nestable_slices_; }
@@ -272,7 +273,7 @@
Stats stats_;
// One entry for each CPU in the trace.
- std::array<SlicesPerCpu, base::kMaxCpus> cpu_events_;
+ Slices slices_;
// One entry for each unique string in the trace.
std::deque<std::string> string_pool_;
diff --git a/src/trace_processor/window_operator_table.cc b/src/trace_processor/window_operator_table.cc
new file mode 100644
index 0000000..3648081
--- /dev/null
+++ b/src/trace_processor/window_operator_table.cc
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/window_operator_table.h"
+
+#include "src/trace_processor/sqlite_utils.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+namespace {
+using namespace sqlite_utils;
+} // namespace
+
+WindowOperatorTable::WindowOperatorTable(sqlite3*, const TraceStorage*) {}
+
+void WindowOperatorTable::RegisterTable(sqlite3* db,
+ const TraceStorage* storage) {
+ Table::Register<WindowOperatorTable>(db, storage, "window", true);
+}
+
+std::string WindowOperatorTable::CreateTableStmt(int, const char* const*) {
+ return "CREATE TABLE x("
+ // These are the operator columns:
+ "rowid HIDDEN UNSIGNED BIG INT, "
+ "quantum HIDDEN UNSIGNED BIG INT, "
+ "window_start HIDDEN UNSIGNED BIG INT, "
+ "window_dur HIDDEN UNSIGNED BIG INT, "
+ // These are the ouput columns:
+ "ts UNSIGNED BIG INT, "
+ "dur UNSIGNED BIG INT, "
+ "cpu UNSIGNED INT, "
+ "quantum_ts UNSIGNED BIG INT, "
+ "PRIMARY KEY(rowid)"
+ ") WITHOUT ROWID;";
+}
+
+std::unique_ptr<Table::Cursor> WindowOperatorTable::CreateCursor() {
+ uint64_t window_end = window_start_ + window_dur_;
+ uint64_t step_size = quantum_ == 0 ? window_dur_ : quantum_;
+ return std::unique_ptr<Table::Cursor>(
+ new Cursor(this, window_start_, window_end, step_size));
+}
+
+int WindowOperatorTable::BestIndex(const QueryConstraints& qc,
+ BestIndexInfo* info) {
+ // Remove ordering on timestamp if it is the only ordering as we are already
+ // sorted on TS. This makes span joining significantly faster.
+ if (qc.order_by().size() == 1 && qc.order_by()[0].iColumn == Column::kTs &&
+ !qc.order_by()[0].desc) {
+ info->order_by_consumed = true;
+ }
+ return SQLITE_OK;
+}
+
+int WindowOperatorTable::Update(int argc,
+ sqlite3_value** argv,
+ sqlite3_int64*) {
+ // We only support updates to ts and dur. Disallow deletes (argc == 1) and
+ // inserts (argv[0] == null).
+ if (argc < 2 || sqlite3_value_type(argv[0]) == SQLITE_NULL)
+ return SQLITE_READONLY;
+
+ quantum_ = static_cast<uint64_t>(sqlite3_value_int64(argv[3]));
+ window_start_ = static_cast<uint64_t>(sqlite3_value_int64(argv[4]));
+ window_dur_ = static_cast<uint64_t>(sqlite3_value_int64(argv[5]));
+
+ return SQLITE_OK;
+}
+
+WindowOperatorTable::Cursor::Cursor(const WindowOperatorTable* table,
+ uint64_t window_start,
+ uint64_t window_end,
+ uint64_t step_size)
+ : window_start_(window_start),
+ window_end_(window_end),
+ step_size_(step_size),
+ table_(table) {}
+
+int WindowOperatorTable::Cursor::Column(sqlite3_context* context, int N) {
+ switch (N) {
+ case Column::kQuantum: {
+ sqlite3_result_int64(context,
+ static_cast<sqlite_int64>(table_->quantum_));
+ break;
+ }
+ case Column::kWindowStart: {
+ sqlite3_result_int64(context,
+ static_cast<sqlite_int64>(table_->window_start_));
+ break;
+ }
+ case Column::kWindowDur: {
+ sqlite3_result_int(context, static_cast<int>(table_->window_dur_));
+ break;
+ }
+ case Column::kTs: {
+ sqlite3_result_int64(context, static_cast<sqlite_int64>(current_ts_));
+ break;
+ }
+ case Column::kDuration: {
+ sqlite3_result_int64(context, static_cast<sqlite_int64>(step_size_));
+ break;
+ }
+ case Column::kCpu: {
+ sqlite3_result_int(context, static_cast<int>(current_cpu_));
+ break;
+ }
+ case Column::kQuantumTs: {
+ sqlite3_result_int64(context, static_cast<sqlite_int64>(quantum_ts_));
+ break;
+ }
+ case Column::kRowId: {
+ sqlite3_result_int64(context, static_cast<sqlite_int64>(row_id_));
+ break;
+ }
+ default: {
+ PERFETTO_FATAL("Unknown column %d", N);
+ break;
+ }
+ }
+ return SQLITE_OK;
+}
+
+int WindowOperatorTable::Cursor::Filter(const QueryConstraints& qc,
+ sqlite3_value** v) {
+ current_ts_ = window_start_;
+ current_cpu_ = 0;
+ quantum_ts_ = 0;
+ row_id_ = 0;
+
+ // Set return first if there is a equals constraint on the row id asking to
+ // return the first row.
+ bool return_first = qc.constraints().size() == 1 &&
+ qc.constraints()[0].iColumn == Column::kRowId &&
+ IsOpEq(qc.constraints()[0].op) &&
+ sqlite3_value_int(v[0]) == 0;
+ // Set return CPU if there is an equals constraint on the CPU column.
+ bool return_cpu = qc.constraints().size() == 1 &&
+ qc.constraints()[0].iColumn == Column::kCpu &&
+ IsOpEq(qc.constraints()[0].op);
+ if (return_first) {
+ filter_type_ = FilterType::kReturnFirst;
+ } else if (return_cpu) {
+ filter_type_ = FilterType::kReturnCpu;
+ current_cpu_ = static_cast<uint32_t>(sqlite3_value_int(v[0]));
+ } else {
+ filter_type_ = FilterType::kReturnAll;
+ }
+ return SQLITE_OK;
+}
+
+int WindowOperatorTable::Cursor::Next() {
+ switch (filter_type_) {
+ case FilterType::kReturnFirst:
+ current_ts_ = window_end_;
+ break;
+ case FilterType::kReturnCpu:
+ current_ts_ += step_size_;
+ quantum_ts_++;
+ break;
+ case FilterType::kReturnAll:
+ if (++current_cpu_ == base::kMaxCpus && current_ts_ < window_end_) {
+ current_cpu_ = 0;
+ current_ts_ += step_size_;
+ quantum_ts_++;
+ }
+ break;
+ }
+ row_id_++;
+ return SQLITE_OK;
+}
+
+int WindowOperatorTable::Cursor::Eof() {
+ return current_ts_ >= window_end_;
+}
+
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/window_operator_table.h b/src/trace_processor/window_operator_table.h
new file mode 100644
index 0000000..8b2ac93
--- /dev/null
+++ b/src/trace_processor/window_operator_table.h
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_WINDOW_OPERATOR_TABLE_H_
+#define SRC_TRACE_PROCESSOR_WINDOW_OPERATOR_TABLE_H_
+
+#include <limits>
+#include <memory>
+
+#include "src/trace_processor/table.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+class TraceStorage;
+
+class WindowOperatorTable : public Table {
+ public:
+ enum Column {
+ kRowId = 0,
+ kQuantum = 1,
+ kWindowStart = 2,
+ kWindowDur = 3,
+ kTs = 4,
+ kDuration = 5,
+ kCpu = 6,
+ kQuantumTs = 7
+ };
+
+ static void RegisterTable(sqlite3* db, const TraceStorage* storage);
+
+ WindowOperatorTable(sqlite3*, const TraceStorage*);
+
+ // Table implementation.
+ std::string CreateTableStmt(int argc, const char* const* argv) override;
+ std::unique_ptr<Table::Cursor> CreateCursor() override;
+ int BestIndex(const QueryConstraints&, BestIndexInfo*) override;
+ int Update(int, sqlite3_value**, sqlite3_int64*) override;
+
+ private:
+ class Cursor : public Table::Cursor {
+ public:
+ Cursor(const WindowOperatorTable*,
+ uint64_t window_start,
+ uint64_t window_end,
+ uint64_t step_size);
+
+ // Implementation of Table::Cursor.
+ int Filter(const QueryConstraints&, sqlite3_value**) override;
+ int Next() override;
+ int Eof() override;
+ int Column(sqlite3_context*, int N) override;
+
+ private:
+ // Defines the data to be generated by the table.
+ enum FilterType {
+ // Returns all the spans for all CPUs.
+ kReturnAll = 0,
+ // Only returns the first span of the table. Useful for UPDATE operations.
+ kReturnFirst = 1,
+ // Only returns all the spans for a chosen CPU.
+ kReturnCpu = 2,
+ };
+
+ uint64_t const window_start_;
+ uint64_t const window_end_;
+ uint64_t const step_size_;
+ const WindowOperatorTable* const table_;
+
+ uint64_t current_ts_ = 0;
+ uint32_t current_cpu_ = 0;
+ uint64_t quantum_ts_ = 0;
+ uint64_t row_id_ = 0;
+
+ FilterType filter_type_ = FilterType::kReturnAll;
+ };
+
+ uint64_t quantum_ = 0;
+ uint64_t window_start_ = 0;
+
+ // max of int64_t because SQLite technically only supports int64s and not
+ // uint64s.
+ uint64_t window_dur_ = std::numeric_limits<int64_t>::max();
+};
+} // namespace trace_processor
+} // namespace perfetto
+
+#endif // SRC_TRACE_PROCESSOR_WINDOW_OPERATOR_TABLE_H_
diff --git a/src/traced/probes/ftrace/ftrace_controller.h b/src/traced/probes/ftrace/ftrace_controller.h
index 3de16cd..b950e31 100644
--- a/src/traced/probes/ftrace/ftrace_controller.h
+++ b/src/traced/probes/ftrace/ftrace_controller.h
@@ -89,7 +89,6 @@
friend class TestFtraceController;
FRIEND_TEST(FtraceControllerIntegrationTest, EnableDisableEvent);
-
FtraceController(const FtraceController&) = delete;
FtraceController& operator=(const FtraceController&) = delete;
diff --git a/ui/src/tracks/cpu_slices/controller.ts b/ui/src/tracks/cpu_slices/controller.ts
index 17a1b0f..77fbdf0 100644
--- a/ui/src/tracks/cpu_slices/controller.ts
+++ b/ui/src/tracks/cpu_slices/controller.ts
@@ -23,49 +23,82 @@
class CpuSliceTrackController extends TrackController<Config, Data> {
static readonly kind = CPU_SLICE_TRACK_KIND;
private busy = false;
+ private setup = false;
- onBoundsChange(start: number, end: number, resolution: number) {
+ onBoundsChange(start: number, end: number, resolution: number): void {
+ this.update(start, end, resolution);
+ }
+
+ private async update(start: number, end: number, resolution: number):
+ Promise<void> {
// TODO: we should really call TraceProcessor.Interrupt() at this point.
if (this.busy) return;
- const LIMIT = 10000;
- const query = 'select ts,dur,utid from sched ' +
- `where cpu = ${this.config.cpu} ` +
- `and ts_lower_bound = ${Math.round(start * 1e9)} ` +
- `and ts <= ${Math.round(end * 1e9)} ` +
- `and dur >= ${Math.round(resolution * 1e9)} ` +
- `and utid != 0 ` +
- `order by ts ` +
- `limit ${LIMIT};`;
+
+ const startNs = Math.round(start * 1e9);
+ const endNs = Math.round(end * 1e9);
+ const resolutionNs = Math.round(resolution * 1e9);
this.busy = true;
- this.engine.query(query).then(rawResult => {
- this.busy = false;
- if (rawResult.error) {
- throw new Error(`Query error "${query}": ${rawResult.error}`);
- }
- const numRows = +rawResult.numRecords;
+ if (this.setup === false) {
+ await this.query(
+ `create virtual table window_${this.trackState.id} using window;`);
+ await this.query(`create virtual table span_${this.trackState.id}
+ using span(sched, window_${this.trackState.id}, cpu);`);
+ this.setup = true;
+ }
- const slices: Data = {
- start,
- end,
- resolution,
- starts: new Float64Array(numRows),
- ends: new Float64Array(numRows),
- utids: new Uint32Array(numRows),
- };
+ this.query(`update window_${this.trackState.id} set
+ window_start=${startNs},
+ window_dur=${endNs - startNs}
+ where rowid = 0;`);
- for (let row = 0; row < numRows; row++) {
- const cols = rawResult.columns;
- const startSec = fromNs(+cols[0].longValues![row]);
- slices.starts[row] = startSec;
- slices.ends[row] = startSec + fromNs(+cols[1].longValues![row]);
- slices.utids[row] = +cols[2].longValues![row];
- }
- if (numRows === LIMIT) {
- slices.end = slices.ends[slices.ends.length - 1];
- }
- this.publish(slices);
- });
+ const LIMIT = 10000;
+ const query = `select ts,dur,utid from span_${this.trackState.id}
+ where cpu = ${this.config.cpu}
+ and utid != 0
+ and dur >= ${resolutionNs}
+ limit ${LIMIT};`;
+ const rawResult = await this.query(query);
+
+ const numRows = +rawResult.numRecords;
+
+ const slices: Data = {
+ start,
+ end,
+ resolution,
+ starts: new Float64Array(numRows),
+ ends: new Float64Array(numRows),
+ utids: new Uint32Array(numRows),
+ };
+
+ for (let row = 0; row < numRows; row++) {
+ const cols = rawResult.columns;
+ const startSec = fromNs(+cols[0].longValues![row]);
+ slices.starts[row] = startSec;
+ slices.ends[row] = startSec + fromNs(+cols[1].longValues![row]);
+ slices.utids[row] = +cols[2].longValues![row];
+ }
+ if (numRows === LIMIT) {
+ slices.end = slices.ends[slices.ends.length - 1];
+ }
+ this.publish(slices);
+ this.busy = false;
+ }
+
+ private async query(query: string) {
+ const result = await this.engine.query(query);
+ if (result.error) {
+ throw new Error(`Query error "${query}": ${result.error}`);
+ }
+ return result;
+ }
+
+ onDestroy(): void {
+ if (this.setup) {
+ this.query(`drop table window_${this.trackState.id}`);
+ this.query(`drop table span_${this.trackState.id}`);
+ this.setup = false;
+ }
}
}