Introduce base::CircularQueue and improve trace processor sorter
This CL introduce a queue container backed by a circular array.
This vastly improves TraceSorter's performance, because it makes
delete-front O(1) (% invoking dtors), but still allows std::sort()
of the elements with the same cost of a vector.
This solution is also faster than using a std::deque for |events_|
instead of the std::vector, this is because CircularBuffer is way
simpler than a std::deque.
Benchmark of sorting-stage, obtained commenting out the parsing
stage and running on a 7GB trace [1] on a macbook pro 15 2018.
std::vector: 70 MB/s
std::deque: 260 MB/s
CircularBuffer: 285 MB/s
More CLs will further improve the sorter, but for now this CL alone
makes the trace processor cope better with large traces.
[1] SHA1: ad54e7de3a7,
trace-walleye-QP1A.190211.001-2019-02-15-20-27-27.perfetto-trace
from www/~primiano/perfetto/sample_traces
Test: perfetto_unittests --gtest_filter=CircularBufferTest.*
Change-Id: Ie804f54e08d7a9d374aefbe6141cbba65ddf25ec
diff --git a/Android.bp b/Android.bp
index 72ca25d..4d8e723 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2585,6 +2585,7 @@
":perfetto_src_traced_probes_ftrace_test_messages_lite_gen",
":perfetto_src_traced_probes_ftrace_test_messages_zero_gen",
"src/base/android_task_runner.cc",
+ "src/base/circular_queue_unittest.cc",
"src/base/event.cc",
"src/base/file_utils.cc",
"src/base/metatrace.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index 3e02aa2..10d8790 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -17,6 +17,7 @@
source_set("base") {
sources = [
"build_config.h",
+ "circular_queue.h",
"container_annotations.h",
"event.h",
"export.h",
diff --git a/include/perfetto/base/circular_queue.h b/include/perfetto/base/circular_queue.h
new file mode 100644
index 0000000..6e8d917
--- /dev/null
+++ b/include/perfetto/base/circular_queue.h
@@ -0,0 +1,301 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_BASE_CIRCULAR_QUEUE_H_
+#define INCLUDE_PERFETTO_BASE_CIRCULAR_QUEUE_H_
+
+#include <stdint.h>
+#include <iterator>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace base {
+
+// CircularQueue is a push-back-only / pop-front-only queue with the following
+// characteristics:
+// - The storage is based on a flat circular buffer. Beginning and end wrap
+// as necessary, to keep pushes and pops O(1) as long as capacity expansion is
+// not required.
+// - Capacity is automatically expanded like in a std::vector. Expansion has a
+// O(N) cost.
+// - It allows random access, allowing in-place std::sort.
+// - Iterators are not stable. Mutating the container invalidates all iterators.
+// - It doesn't bother with const-correctness.
+//
+// Implementation details:
+// Internally, |begin|, |end| and iterators use 64-bit monotonic indexes, which
+// are incremented as if the queue was backed by unlimited storage.
+// Even assuming that elements are inserted and removed every nanosecond, 64 bit
+// is enough for 584 years.
+// Wrapping happens only when addressing elements in the underlying circular
+// storage. This limits the complexity and avoiding dealing with modular
+// arithmetic all over the places.
+template <class T>
+class CircularQueue {
+ public:
+ class Iterator {
+ public:
+ using difference_type = ptrdiff_t;
+ using value_type = T;
+ using pointer = const T*;
+ using reference = const T&;
+ using iterator_category = std::random_access_iterator_tag;
+
+ Iterator(CircularQueue* queue, uint64_t pos, uint32_t generation)
+ : queue_(queue),
+ pos_(pos)
+#if PERFETTO_DCHECK_IS_ON()
+ ,
+ generation_(generation)
+#endif
+ {
+ ignore_result(generation);
+ }
+
+ T* operator->() {
+#if PERFETTO_DCHECK_IS_ON()
+ PERFETTO_DCHECK(generation_ == queue_->generation());
+#endif
+ return queue_->Get(pos_);
+ }
+
+ T& operator*() { return *(operator->()); }
+
+ const value_type& operator[](difference_type i) const {
+ return *(*this + i);
+ }
+
+ Iterator& operator++() {
+ Add(1);
+ return *this;
+ }
+
+ Iterator operator++(int) {
+ Iterator ret = *this;
+ Add(1);
+ return ret;
+ }
+
+ Iterator& operator--() {
+ Add(-1);
+ return *this;
+ }
+
+ Iterator operator--(int) {
+ Iterator ret = *this;
+ Add(-1);
+ return ret;
+ }
+
+ friend Iterator operator+(const Iterator& iter, difference_type offset) {
+ Iterator ret = iter;
+ ret.Add(offset);
+ return ret;
+ }
+
+ Iterator& operator+=(difference_type offset) {
+ Add(offset);
+ return *this;
+ }
+
+ friend Iterator operator-(const Iterator& iter, difference_type offset) {
+ Iterator ret = iter;
+ ret.Add(-offset);
+ return ret;
+ }
+
+ Iterator& operator-=(difference_type offset) {
+ Add(-offset);
+ return *this;
+ }
+
+ friend ptrdiff_t operator-(const Iterator& lhs, const Iterator& rhs) {
+ return static_cast<ptrdiff_t>(lhs.pos_) -
+ static_cast<ptrdiff_t>(rhs.pos_);
+ }
+
+ friend bool operator==(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ == rhs.pos_;
+ }
+
+ friend bool operator!=(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ != rhs.pos_;
+ }
+
+ friend bool operator<(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ < rhs.pos_;
+ }
+
+ friend bool operator<=(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ <= rhs.pos_;
+ }
+
+ friend bool operator>(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ > rhs.pos_;
+ }
+
+ friend bool operator>=(const Iterator& lhs, const Iterator& rhs) {
+ return lhs.pos_ >= rhs.pos_;
+ }
+
+ private:
+ inline void Add(difference_type offset) {
+ pos_ = static_cast<uint64_t>(static_cast<difference_type>(pos_) + offset);
+ PERFETTO_DCHECK(pos_ <= queue_->end_);
+ }
+
+ CircularQueue* queue_;
+ uint64_t pos_;
+
+#if PERFETTO_DCHECK_IS_ON()
+ uint32_t generation_;
+#endif
+ };
+
+ CircularQueue(size_t initial_capacity = 1024) { Grow(initial_capacity); }
+
+ CircularQueue(CircularQueue&& other) noexcept {
+ // Copy all fields using the (private) default copy assignment operator.
+ *this = other;
+ increment_generation();
+ new (&other) CircularQueue(); // Reset the old queue so it's still usable.
+ }
+
+ CircularQueue& operator=(CircularQueue&& other) {
+ this->~CircularQueue(); // Destroy the current state.
+ new (this) CircularQueue(std::move(other)); // Use the move ctor above.
+ return *this;
+ }
+
+ ~CircularQueue() {
+ if (!entries_) {
+ PERFETTO_DCHECK(empty());
+ return;
+ }
+ erase_front(size()); // Invoke destructors on all alive entries.
+ PERFETTO_DCHECK(empty());
+ free(entries_);
+ }
+
+ template <typename... Args>
+ void emplace_back(Args&&... args) {
+ increment_generation();
+ if (PERFETTO_UNLIKELY(size() >= capacity_))
+ Grow();
+ T* slot = Get(end_++);
+ new (slot) T(std::forward<Args>(args)...);
+ }
+
+ void erase_front(size_t n) {
+ increment_generation();
+ for (; n && (begin_ < end_); --n) {
+ Get(begin_)->~T();
+ begin_++; // This needs to be its own statement, Get() checks begin_.
+ }
+ }
+
+ void pop_front() { erase_front(1); }
+
+ T& at(size_t idx) {
+ PERFETTO_DCHECK(idx < size());
+ return *Get(begin_ + idx);
+ }
+
+ Iterator begin() { return Iterator(this, begin_, generation()); }
+ Iterator end() { return Iterator(this, end_, generation()); }
+ T& front() { return *begin(); }
+ T& back() { return *(end() - 1); }
+
+ bool empty() const { return size() == 0; }
+
+ size_t size() const {
+ PERFETTO_DCHECK(end_ - begin_ <= capacity_);
+ return static_cast<size_t>(end_ - begin_);
+ }
+
+ size_t capacity() const { return capacity_; }
+
+#if PERFETTO_DCHECK_IS_ON()
+ uint32_t generation() const { return generation_; }
+ void increment_generation() { ++generation_; }
+#else
+ uint32_t generation() const { return 0; }
+ void increment_generation() {}
+#endif
+
+ private:
+ CircularQueue(const CircularQueue&) = delete;
+ CircularQueue& operator=(const CircularQueue&) = default;
+
+ void Grow(size_t new_capacity = 0) {
+ // Capacity must be always a power of two. This allows Get() to use a simple
+ // bitwise-AND for handling the wrapping instead of a full division.
+ new_capacity = new_capacity ? new_capacity : capacity_ * 2;
+ PERFETTO_CHECK((new_capacity & (new_capacity - 1)) == 0); // Must be pow2.
+
+ // On 32-bit systems this might hit the 4GB wall and overflow. We can't do
+ // anything other than crash in this case.
+ PERFETTO_CHECK(new_capacity > capacity_);
+ size_t malloc_size = new_capacity * sizeof(T);
+ PERFETTO_CHECK(malloc_size > new_capacity);
+ auto* new_vec = static_cast<T*>(malloc(malloc_size));
+
+ // Move all elements in the expanded array.
+ size_t new_size = 0;
+ for (uint64_t i = begin_; i < end_; i++)
+ new (&new_vec[new_size++]) T(std::move(*Get(i))); // Placement move ctor.
+
+ // Even if all the elements are std::move()-d and likely empty, we are still
+ // required to call the dtor for them.
+ for (uint64_t i = begin_; i < end_; i++)
+ Get(i)->~T();
+ free(entries_); // It's fine to free(nullptr) (for the ctor call case).
+
+ begin_ = 0;
+ end_ = new_size;
+ capacity_ = new_capacity;
+ entries_ = new_vec;
+ }
+
+ inline T* Get(uint64_t pos) {
+ PERFETTO_DCHECK(pos >= begin_ && pos < end_);
+ PERFETTO_DCHECK((capacity_ & (capacity_ - 1)) == 0); // Must be a pow2.
+ auto index = static_cast<size_t>(pos & (capacity_ - 1));
+ return &entries_[index];
+ }
+
+ // Underlying storage. It's raw malloc-ed rather than being a unique_ptr<T[]>
+ // to allow having uninitialized entries inside it.
+ T* entries_ = nullptr;
+ size_t capacity_ = 0; // Number of allocated slots (NOT bytes) in |entries_|.
+
+ // The |begin_| and |end_| indexes are monotonic and never wrap. Modular arith
+ // is used only when dereferencing entries in the vector.
+ uint64_t begin_ = 0;
+ uint64_t end_ = 0;
+
+// Generation is used in debug builds only for checking iterator validity.
+#if PERFETTO_DCHECK_IS_ON()
+ uint32_t generation_ = 0;
+#endif
+};
+
+} // namespace base
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_BASE_CIRCULAR_QUEUE_H_
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 8b285f2..4546cca 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -138,6 +138,7 @@
deps += [ ":android_task_runner" ]
}
sources = [
+ "circular_queue_unittest.cc",
"optional_unittest.cc",
"paged_memory_unittest.cc",
"scoped_file_unittest.cc",
diff --git a/src/base/circular_queue_unittest.cc b/src/base/circular_queue_unittest.cc
new file mode 100644
index 0000000..cdb0ef7
--- /dev/null
+++ b/src/base/circular_queue_unittest.cc
@@ -0,0 +1,286 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/circular_queue.h"
+
+#include <random>
+
+#include "gtest/gtest.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+TEST(CircularQueueTest, Int) {
+ CircularQueue<int> queue(/*initial_capacity=*/1);
+ ASSERT_EQ(queue.size(), 0u);
+ queue.emplace_back(101);
+ ASSERT_EQ(queue.size(), 1u);
+ queue.emplace_back(102);
+ queue.emplace_back(103);
+ queue.emplace_back(104);
+ ASSERT_EQ(queue.size(), 4u);
+
+ auto it = queue.begin();
+ for (int i = 101; i <= 104; i++) {
+ ASSERT_EQ(*it, i);
+ it++;
+ }
+ ASSERT_EQ(it, queue.end());
+
+ queue.erase_front(1);
+ ASSERT_EQ(queue.size(), 3u);
+ ASSERT_EQ(*queue.begin(), 102);
+
+ *(queue.begin() + 1) = 42;
+ ASSERT_EQ(*(queue.end() - 2), 42);
+
+ queue.erase_front(2);
+ ASSERT_EQ(queue.size(), 1u);
+ ASSERT_EQ(*queue.begin(), 104);
+
+ queue.pop_front();
+ ASSERT_EQ(queue.size(), 0u);
+ ASSERT_EQ(queue.begin(), queue.end());
+
+ const int kNumInts = 100000;
+
+ {
+ std::minstd_rand0 rnd_engine(0);
+ for (int i = 0; i < kNumInts; i++) {
+ int n = static_cast<int>(rnd_engine());
+ queue.emplace_back(n);
+ }
+ }
+ ASSERT_EQ(queue.size(), kNumInts);
+ ASSERT_EQ(queue.end() - queue.begin(), kNumInts);
+ {
+ std::minstd_rand0 rnd_engine(0);
+ it = queue.begin();
+ for (int i = 0; i < kNumInts; ++i, ++it) {
+ ASSERT_LT(it, queue.end());
+ ASSERT_EQ(*it, static_cast<int>(rnd_engine()));
+ }
+ }
+
+ {
+ std::minstd_rand0 del_rnd(42);
+ std::minstd_rand0 rnd_engine(0);
+ while (!queue.empty()) {
+ ASSERT_EQ(*queue.begin(), static_cast<int>(rnd_engine()));
+ size_t num_del = (del_rnd() % 8) + 1;
+ queue.erase_front(num_del + 1); // +1 because of the read 2 lines above.
+ rnd_engine.discard(num_del);
+ }
+ }
+}
+
+TEST(CircularQueueTest, Sorting) {
+ CircularQueue<uint64_t> queue;
+ std::minstd_rand0 rnd_engine(0);
+ for (int i = 0; i < 100000; i++) {
+ queue.emplace_back(static_cast<uint64_t>(rnd_engine()));
+ if ((i % 100) == 0)
+ queue.erase_front(29);
+ }
+ ASSERT_FALSE(std::is_sorted(queue.begin(), queue.end()));
+ std::sort(queue.begin(), queue.end());
+ ASSERT_TRUE(std::is_sorted(queue.begin(), queue.end()));
+}
+
+TEST(CircularQueueTest, MoveOperators) {
+ CircularQueue<int> queue;
+ queue.emplace_back(1);
+ queue.emplace_back(2);
+
+ {
+ CircularQueue<int> moved(std::move(queue));
+ ASSERT_TRUE(queue.empty());
+ ASSERT_EQ(moved.size(), 2u);
+
+ moved.emplace_back(3);
+ moved.emplace_back(4);
+ ASSERT_EQ(moved.size(), 4u);
+ }
+ queue.emplace_back(10);
+ queue.emplace_back(11);
+ queue.emplace_back(12);
+ ASSERT_EQ(queue.size(), 3u);
+ ASSERT_EQ(queue.front(), 10);
+ ASSERT_EQ(queue.back(), 12);
+
+ {
+ CircularQueue<int> moved;
+ moved.emplace_back(42);
+ moved = std::move(queue);
+ ASSERT_TRUE(queue.empty());
+ ASSERT_EQ(moved.size(), 3u);
+ ASSERT_EQ(moved.size(), 3u);
+ ASSERT_EQ(moved.front(), 10);
+ ASSERT_EQ(moved.back(), 12);
+ }
+}
+
+TEST(CircularQueueTest, Iterators) {
+ for (size_t repeat = 1; repeat < 8; repeat++) {
+ size_t capacity = 8 * (1 << repeat);
+ CircularQueue<size_t> queue(capacity);
+ for (size_t i = 0; i < capacity - 2; i++)
+ queue.emplace_back(0u);
+ queue.erase_front(queue.size());
+ ASSERT_TRUE(queue.empty());
+ ASSERT_EQ(queue.capacity(), capacity);
+
+ // Now the queue is empty and the internal write iterator is abut to wrap.
+
+ // Add a bit more than half-capacity and check that the queue didn't resize.
+ for (size_t i = 0; i < capacity / 2 + 3; i++)
+ queue.emplace_back(i);
+ ASSERT_EQ(queue.capacity(), capacity);
+
+ // Check that all iterators are consistent.
+ auto begin = queue.begin();
+ auto end = queue.end();
+ auto mid = begin + std::distance(begin, end) / 2;
+ ASSERT_TRUE(std::is_sorted(begin, end));
+ ASSERT_TRUE(begin < end);
+ ASSERT_TRUE(begin <= begin);
+ ASSERT_TRUE(begin >= begin);
+ ASSERT_FALSE(begin < begin);
+ ASSERT_FALSE(begin > begin);
+ ASSERT_TRUE(begin + 1 > begin);
+ ASSERT_TRUE(begin + 1 >= begin);
+ ASSERT_FALSE(begin >= begin + 1);
+ ASSERT_TRUE(begin <= begin);
+ ASSERT_TRUE(begin <= begin + 1);
+ ASSERT_TRUE(end > mid);
+ ASSERT_TRUE(mid > begin);
+ ASSERT_TRUE(std::is_sorted(begin, mid));
+ ASSERT_TRUE(std::is_sorted(mid, end));
+ }
+}
+
+TEST(CircularQueueTest, ObjectLifetime) {
+ class Checker {
+ public:
+ struct Stats {
+ int num_ctors = 0;
+ int num_dtors = 0;
+ int num_alive = 0;
+ };
+ Checker(Stats* stats, int n) : stats_(stats), n_(n) {
+ EXPECT_GE(n, 0);
+ stats_->num_ctors++;
+ stats_->num_alive++;
+ ptr_ = reinterpret_cast<uintptr_t>(this);
+ }
+
+ ~Checker() {
+ EXPECT_EQ(ptr_, reinterpret_cast<uintptr_t>(this));
+ if (n_ >= 0)
+ stats_->num_alive--;
+ n_ = -1;
+ stats_->num_dtors++;
+ }
+
+ Checker(Checker&& other) noexcept {
+ n_ = other.n_;
+ other.n_ = -1;
+ stats_ = other.stats_;
+ ptr_ = reinterpret_cast<uintptr_t>(this);
+ }
+
+ Checker& operator=(Checker&& other) {
+ new (this) Checker(std::move(other));
+ return *this;
+ }
+
+ Checker(const Checker&) = delete;
+ Checker& operator=(const Checker&) = delete;
+
+ Stats* stats_ = nullptr;
+ uintptr_t ptr_ = 0;
+ int n_ = -1;
+ };
+
+ // Check that dtors are invoked on old elements when growing capacity.
+ Checker::Stats stats;
+ {
+ CircularQueue<Checker> queue(/*initial_capacity=*/2);
+ for (int i = 0; i < 2; i++)
+ queue.emplace_back(&stats, i);
+ ASSERT_EQ(stats.num_ctors, 2);
+ ASSERT_EQ(stats.num_dtors, 0);
+
+ // This further insertion will grow the queue, causing two std::move()s
+ // for the previous elements and one emplace.
+ queue.emplace_back(&stats, 2);
+ ASSERT_EQ(stats.num_ctors, 3);
+
+ // The two old elements that have std::move()d should be destroyed upon
+ // expansion.
+ ASSERT_EQ(stats.num_dtors, 2);
+ }
+ ASSERT_EQ(stats.num_dtors, 2 + 3);
+
+ stats = Checker::Stats();
+ {
+ CircularQueue<Checker> queue(/*initial_capacity=*/1);
+ for (int i = 0; i < 5; i++)
+ queue.emplace_back(&stats, i);
+ ASSERT_EQ(stats.num_ctors, 5);
+ Checker c5(&stats, 5);
+ queue.emplace_back(std::move(c5));
+ ASSERT_EQ(stats.num_alive, 5 + 1);
+
+ queue.erase_front(2);
+ ASSERT_EQ(stats.num_alive, 5 + 1 - 2);
+
+ for (int i = 0; i < 4; i++)
+ queue.emplace_back(&stats, 10 + i);
+ ASSERT_EQ(stats.num_alive, 5 + 1 - 2 + 4);
+ }
+ ASSERT_EQ(stats.num_ctors, 5 + 1 + 4);
+ ASSERT_EQ(stats.num_alive, 0);
+
+ stats = Checker::Stats();
+ {
+ CircularQueue<Checker> q1(1);
+ CircularQueue<Checker> q2(64);
+ for (int i = 0; i < 100; i++) {
+ q1.emplace_back(&stats, 1000 + i * 2);
+ q2.emplace_back(&stats, 1001 + i * 2);
+ }
+
+ ASSERT_EQ(stats.num_alive, 200);
+
+ for (int i = 0; i < 100; i += 2) {
+ auto it1 = q1.begin() + i;
+ auto it2 = q2.begin() + i;
+ std::swap(*it1, *it2);
+ }
+ auto comparer = [](const Checker& lhs, const Checker& rhs) {
+ return lhs.n_ < rhs.n_;
+ };
+ ASSERT_TRUE(std::is_sorted(q1.begin(), q1.end(), comparer));
+ ASSERT_TRUE(std::is_sorted(q2.begin(), q2.end(), comparer));
+ ASSERT_EQ(stats.num_alive, 200);
+ }
+}
+
+} // namespace
+} // namespace base
+} // namespace perfetto
diff --git a/src/trace_processor/trace_sorter.cc b/src/trace_processor/trace_sorter.cc
index ea17d04..c7a87ee 100644
--- a/src/trace_processor/trace_sorter.cc
+++ b/src/trace_processor/trace_sorter.cc
@@ -76,7 +76,8 @@
// Now erase-front all the expired events that have been pushed by the
// previous loop.
- events_.erase(events_.begin(), flush_end);
+ auto del_sz = static_cast<size_t>(std::distance(events_.begin(), flush_end));
+ events_.erase_front(del_sz);
if (events_.size() > 0) {
earliest_timestamp_ = events_.front().timestamp;
diff --git a/src/trace_processor/trace_sorter.h b/src/trace_processor/trace_sorter.h
index 7bf3781..ab9e2a4 100644
--- a/src/trace_processor/trace_sorter.h
+++ b/src/trace_processor/trace_sorter.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2017 The Android Open Source Project
+ * Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
#include <vector>
+#include "perfetto/base/circular_queue.h"
#include "perfetto/trace_processor/basic_types.h"
#include "src/trace_processor/trace_blob_view.h"
#include "src/trace_processor/trace_processor_context.h"
@@ -63,8 +64,8 @@
struct TimestampedTracePiece {
static constexpr uint32_t kNoCpu = std::numeric_limits<uint32_t>::max();
- TimestampedTracePiece(int64_t a, size_t i, TraceBlobView b, uint32_t c)
- : timestamp(a), packet_idx_(i), blob_view(std::move(b)), cpu(c) {}
+ TimestampedTracePiece(int64_t a, uint32_t i, TraceBlobView b, uint32_t c)
+ : timestamp(a), blob_view(std::move(b)), packet_idx_(i), cpu(c) {}
TimestampedTracePiece(TimestampedTracePiece&&) noexcept = default;
TimestampedTracePiece& operator=(TimestampedTracePiece&&) = default;
@@ -83,8 +84,8 @@
bool is_ftrace() const { return cpu != kNoCpu; }
int64_t timestamp;
- size_t packet_idx_;
TraceBlobView blob_view;
+ uint32_t packet_idx_;
uint32_t cpu;
};
@@ -150,8 +151,7 @@
// things happening here: (1) Sorting the tail of |events_|; (2) Erasing the
// head of |events_| and shifting them left. Both operations become way
// faster if done in large batches (~1M events), where we end up erasing
- // 90% or more of |events_| and the erase-front becomes mainly a memmove of
- // the remaining tail elements. Capping at 1M objectis to avoid holding
+ // 90% or more of |events_|. Capping at 1M object to avoid holding onto
// too many events in the staging area.
if (optimization_ == OptimizationMode::kMaxBandwidth &&
latest_timestamp_ - earliest_timestamp_ < window_size_ns_ * 10 &&
@@ -162,10 +162,7 @@
SortAndFlushEventsBeyondWindow(window_size_ns_);
}
- // std::deque makes erase-front potentially faster but std::sort slower.
- // Overall seems slower than a vector (350 MB/s vs 400 MB/s) without counting
- // next pipeline stages.
- std::vector<TimestampedTracePiece> events_;
+ base::CircularQueue<TimestampedTracePiece> events_;
TraceProcessorContext* const context_;
OptimizationMode optimization_;
@@ -180,7 +177,7 @@
int64_t earliest_timestamp_ = std::numeric_limits<int64_t>::max();
// Monotonic increasing value used to index timestamped trace pieces.
- size_t packet_idx_ = 0;
+ uint32_t packet_idx_ = 0;
// Contains the index (< events_.size()) of the last sorted event. In essence,
// events_[0..sort_start_idx_] are guaranteed to be in-order, while