trace writer: Add a proxying StartupTraceWriter.
Adds StartupTraceWriter, which facilitates writing trace events during
early app startup while the connection to the perfetto service is
unavailable. The StartupTraceWriter writes into a lock-guarded
temporary local buffer until it is bound to a SMB-backed TraceWriter.
Once bound, any data written to the local buffer is copied to the SMB.
Future writes then proxy directly to the SMB-backed TraceWriter.
Bug: 122507010
Change-Id: I380eb088f42b050f097a6453c3001322c603d285
diff --git a/Android.bp b/Android.bp
index 82f4b33..8f5d8cb 100644
--- a/Android.bp
+++ b/Android.bp
@@ -88,6 +88,7 @@
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
+ "src/tracing/core/startup_trace_writer.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -273,6 +274,7 @@
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
+ "src/tracing/core/startup_trace_writer.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -412,6 +414,7 @@
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
+ "src/tracing/core/startup_trace_writer.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -608,6 +611,7 @@
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
+ "src/tracing/core/startup_trace_writer.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -2320,6 +2324,7 @@
"src/tracing/core/shared_memory_abi.cc",
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
+ "src/tracing/core/startup_trace_writer.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -2649,6 +2654,8 @@
"src/tracing/core/shared_memory_arbiter_impl_unittest.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/sliced_protobuf_input_stream_unittest.cc",
+ "src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_unittest.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
diff --git a/include/perfetto/protozero/message_handle.h b/include/perfetto/protozero/message_handle.h
index 5fbe05b..fd21d2e 100644
--- a/include/perfetto/protozero/message_handle.h
+++ b/include/perfetto/protozero/message_handle.h
@@ -39,6 +39,12 @@
class PERFETTO_EXPORT MessageHandleBase {
public:
+ class FinalizationListener {
+ public:
+ virtual ~FinalizationListener();
+ virtual void OnMessageFinalized(Message* message) = 0;
+ };
+
~MessageHandleBase();
// Move-only type.
@@ -51,6 +57,10 @@
return !!message_;
}
+ void set_finalization_listener(FinalizationListener* listener) {
+ listener_ = listener;
+ }
+
protected:
explicit MessageHandleBase(Message* = nullptr);
Message* operator->() const {
@@ -66,10 +76,27 @@
MessageHandleBase(const MessageHandleBase&) = delete;
MessageHandleBase& operator=(const MessageHandleBase&) = delete;
- void reset_message() { message_ = nullptr; }
+ void reset_message() {
+ // This is called by Message::Finalize().
+ PERFETTO_DCHECK(message_->is_finalized());
+ message_ = nullptr;
+ listener_ = nullptr;
+ }
+
void Move(MessageHandleBase&&);
+ void FinalizeMessage() {
+ // |message_| and |listener_| may be cleared by reset_message() during
+ // Message::Finalize().
+ auto* listener = listener_;
+ auto* message = message_;
+ message->Finalize();
+ if (listener)
+ listener->OnMessageFinalized(message);
+ }
+
Message* message_;
+ FinalizationListener* listener_ = nullptr;
#if PERFETTO_DCHECK_IS_ON()
uint32_t generation_;
#endif
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index 1320683..b09026c 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -27,6 +27,7 @@
"shared_memory_abi.h",
"shared_memory_arbiter.h",
"slice.h",
+ "startup_trace_writer.h",
"trace_config.h",
"trace_packet.h",
"trace_writer.h",
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
index bfcac3b..7a5a14c 100644
--- a/include/perfetto/tracing/core/shared_memory_abi.h
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -392,6 +392,19 @@
return packets.count;
}
+ // Increases |packets.count| to the given |packet_count|, but only if
+ // |packet_count| is larger than the current value of |packets.count|.
+ // Returns the new packet count. Same atomicity guarantees as
+ // IncrementPacketCount().
+ uint16_t IncreasePacketCountTo(uint16_t packet_count) {
+ ChunkHeader* chunk_header = header();
+ auto packets = chunk_header->packets.load(std::memory_order_relaxed);
+ if (packets.count < packet_count)
+ packets.count = packet_count;
+ chunk_header->packets.store(packets, std::memory_order_release);
+ return packets.count;
+ }
+
// Flags are cleared by TryAcquireChunk(), by passing the new header for
// the chunk.
void SetFlag(ChunkHeader::Flags flag) {
@@ -518,6 +531,10 @@
std::pair<size_t, size_t> GetPageAndChunkIndex(const Chunk& chunk);
+ uint16_t GetChunkSizeForLayout(uint32_t page_layout) const {
+ return chunk_sizes_[(page_layout & kLayoutMask) >> kLayoutShift];
+ }
+
static ChunkState GetChunkStateFromLayout(uint32_t page_layout,
size_t chunk_idx) {
return static_cast<ChunkState>((page_layout >> (chunk_idx * kChunkShift)) &
@@ -546,10 +563,6 @@
SharedMemoryABI(const SharedMemoryABI&) = delete;
SharedMemoryABI& operator=(const SharedMemoryABI&) = delete;
- uint16_t GetChunkSizeForLayout(uint32_t page_layout) const {
- return chunk_sizes_[(page_layout & kLayoutMask) >> kLayoutShift];
- }
-
Chunk TryAcquireChunk(size_t page_idx,
size_t chunk_idx,
ChunkState,
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index 839e9db..e285648 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -34,6 +34,7 @@
}
class CommitDataRequest;
+class StartupTraceWriter;
class SharedMemory;
class TraceWriter;
@@ -50,6 +51,15 @@
virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) = 0;
+ // Binds the provided unbound StartupTraceWriter to a new TraceWriter
+ // associated with the arbiter's SMB. Returns |false| if binding failed
+ // because the writer is concurrently writing data to its temporary buffer. In
+ // this case, the caller should retry (it is free to try again immediately or
+ // schedule a wakeup to retry later).
+ virtual bool BindStartupTraceWriter(StartupTraceWriter* writer,
+ BufferID target_buffer)
+ PERFETTO_WARN_UNUSED_RESULT = 0;
+
// Notifies the service that all data for the given FlushRequestID has been
// committed in the shared memory buffer.
virtual void NotifyFlushComplete(FlushRequestID) = 0;
diff --git a/include/perfetto/tracing/core/startup_trace_writer.h b/include/perfetto/tracing/core/startup_trace_writer.h
new file mode 100644
index 0000000..d4d0f36
--- /dev/null
+++ b/include/perfetto/tracing/core/startup_trace_writer.h
@@ -0,0 +1,155 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
+
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/optional.h"
+#include "perfetto/base/thread_checker.h"
+#include "perfetto/protozero/message_handle.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/protozero/scattered_stream_writer.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/trace_writer.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiterImpl;
+
+namespace protos {
+namespace pbzero {
+class TracePacket;
+} // namespace pbzero
+} // namespace protos
+
+// Facilitates writing trace events in early phases of an application's startup
+// when the perfetto service is not available yet.
+//
+// Until the service is available, producer threads instantiate an unbound
+// StartupTraceWriter instance and use it to emit trace events. Each writer will
+// record the serialized trace events into a temporary local memory buffer.
+//
+// Once the service is available, the producer binds each StartupTraceWriter to
+// the SMB by calling SharedMemoryArbiter::BindStartupTraceWriter(). The data in
+// the writer's local buffer will then be copied into the SMB and the any future
+// writes will proxy directly to a new SMB-backed TraceWriter.
+//
+// Writing to the temporary local trace buffer is guarded by a lock and flag to
+// allow binding the writer from a different thread. When the writer starts
+// writing data by calling NewTracePacket(), the writer thread acquires the lock
+// to set a flag indicating that a write is in progress. Once the packet is
+// finalized, the flag is reset. To bind the writer, the lock is acquired while
+// the flag is unset and released only once binding completed, thereby blocking
+// the writer thread from starting a write concurrently.
+//
+// While unbound, the writer thread should finalize each TracePacket as soon as
+// possible to ensure that it doesn't block binding the writer.
+class PERFETTO_EXPORT StartupTraceWriter
+ : public TraceWriter,
+ public protozero::MessageHandleBase::FinalizationListener {
+ public:
+ // Create an unbound StartupTraceWriter that can later be bound by calling
+ // BindToTraceWriter().
+ StartupTraceWriter();
+
+ // Create a StartupTraceWriter bound to |trace_writer|. Should only be called
+ // on the writer thread.
+ explicit StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer);
+
+ ~StartupTraceWriter() override;
+
+ // TraceWriter implementation. These methods should only be called on the
+ // writer thread.
+ TracePacketHandle NewTracePacket() override;
+ void Flush(std::function<void()> callback = {}) override;
+
+ // Note that this will return 0 until the first TracePacket was started after
+ // binding.
+ WriterID writer_id() const override;
+
+ // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
+ // Called by SharedMemoryArbiterImpl::BindStartupTraceWriter().
+ //
+ // This method can be called on any thread. If any data was written locally
+ // before the writer was bound, BindToArbiter() will copy this data into
+ // chunks in the provided target buffer via the SMB. Any future packets will
+ // be directly written into the SMB via a newly obtained TraceWriter from the
+ // arbiter.
+ //
+ // Will fail and return |false| if a concurrent write is in progress.
+ bool BindToArbiter(SharedMemoryArbiterImpl*,
+ BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
+
+ // Returns |true| if the writer thread has observed that the writer was bound
+ // to an SMB. Should only be called on the writer thread.
+ //
+ // The writer thread can use the return value to determine whether it needs to
+ // finalize the current TracePacket as soon as possible. It is only safe for
+ // the writer to batch data into a single TracePacket over a longer time
+ // period when this returns |true|.
+ bool was_bound() const {
+ PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+ return was_bound_;
+ }
+
+ // Should only be called on the writer thread.
+ size_t used_buffer_size();
+
+ private:
+ // protozero::MessageHandleBase::FinalizationListener implementation.
+ void OnMessageFinalized(protozero::Message* message) override;
+
+ void OnTracePacketCompleted();
+ ChunkID CommitLocalBufferChunks(SharedMemoryArbiterImpl*, WriterID, BufferID);
+
+ PERFETTO_THREAD_CHECKER(writer_thread_checker_)
+
+ // Only set and accessed from the writer thread. The writer thread flips this
+ // bit when it sees that trace_writer_ is set (while holding the lock).
+ // Caching this fact in this variable avoids the need to acquire the lock to
+ // check on later calls to NewTracePacket().
+ bool was_bound_ = false;
+
+ // All variables below this point are protected by |lock_|.
+ std::mutex lock_;
+
+ // Never reset once it is changed from |nullptr|.
+ std::unique_ptr<TraceWriter> trace_writer_ = nullptr;
+
+ // Local memory buffer for trace packets written before the writer is bound.
+ std::unique_ptr<protozero::ScatteredHeapBuffer> memory_buffer_;
+ std::unique_ptr<protozero::ScatteredStreamWriter> memory_stream_writer_;
+
+ std::vector<uint32_t> packet_sizes_;
+ size_t total_payload_size = 0;
+
+ // Whether the writer thread is currently writing a TracePacket.
+ bool write_in_progress_ = false;
+
+ // The packet returned via NewTracePacket() while the writer is unbound. Reset
+ // to |nullptr| once bound. Owned by this class, TracePacketHandle has just a
+ // pointer to it.
+ std::unique_ptr<protos::pbzero::TracePacket> cur_packet_;
+};
+
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_H_
diff --git a/include/perfetto/tracing/core/trace_writer.h b/include/perfetto/tracing/core/trace_writer.h
index 080d01d..ef864ba 100644
--- a/include/perfetto/tracing/core/trace_writer.h
+++ b/include/perfetto/tracing/core/trace_writer.h
@@ -77,6 +77,13 @@
virtual WriterID writer_id() const = 0;
+ // Set the id of the first chunk the writer will emit. Returns |false| if not
+ // implemented or if the first chunk was already emitted by the writer.
+ //
+ // StartupTraceWriter will call this if it committed buffered data on
+ // behalf of the TraceWriter.
+ virtual bool SetFirstChunkId(ChunkID);
+
private:
TraceWriter(const TraceWriter&) = delete;
TraceWriter& operator=(const TraceWriter&) = delete;
diff --git a/src/protozero/message_handle.cc b/src/protozero/message_handle.cc
index d8e5ad2..79c402d 100644
--- a/src/protozero/message_handle.cc
+++ b/src/protozero/message_handle.cc
@@ -22,6 +22,8 @@
namespace protozero {
+MessageHandleBase::FinalizationListener::~FinalizationListener() {}
+
MessageHandleBase::MessageHandleBase(Message* message) : message_(message) {
#if PERFETTO_DCHECK_IS_ON()
generation_ = message_ ? message->generation_ : 0;
@@ -35,7 +37,7 @@
#if PERFETTO_DCHECK_IS_ON()
PERFETTO_DCHECK(generation_ == message_->generation_);
#endif
- message_->Finalize();
+ FinalizeMessage();
}
}
@@ -48,7 +50,7 @@
// one, finalize the old message. However, if the other message is the same as
// the one we point to, don't finalize.
if (message_ && message_ != other.message_)
- message_->Finalize();
+ FinalizeMessage();
Move(std::move(other));
return *this;
}
@@ -56,6 +58,8 @@
void MessageHandleBase::Move(MessageHandleBase&& other) {
message_ = other.message_;
other.message_ = nullptr;
+ listener_ = other.listener_;
+ other.listener_ = nullptr;
#if PERFETTO_DCHECK_IS_ON()
if (message_) {
generation_ = message_->generation_;
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index b36e073..ae49d44 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -54,6 +54,7 @@
"core/shared_memory_arbiter_impl.h",
"core/sliced_protobuf_input_stream.cc",
"core/sliced_protobuf_input_stream.h",
+ "core/startup_trace_writer.cc",
"core/sys_stats_config.cc",
"core/test_config.cc",
"core/trace_buffer.cc",
@@ -112,7 +113,9 @@
sources += [
"core/service_impl_unittest.cc",
"core/shared_memory_arbiter_impl_unittest.cc",
+ "core/startup_trace_writer_unittest.cc",
"core/trace_writer_impl_unittest.cc",
+ "test/fake_producer_endpoint.h",
"test/mock_consumer.cc",
"test/mock_consumer.h",
"test/mock_producer.cc",
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index d46730a..e4d0e2d 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -21,6 +21,7 @@
#include "perfetto/base/time.h"
#include "perfetto/tracing/core/commit_data_request.h"
#include "perfetto/tracing/core/shared_memory.h"
+#include "perfetto/tracing/core/startup_trace_writer.h"
#include "src/tracing/core/null_trace_writer.h"
#include "src/tracing/core/trace_writer_impl.h"
@@ -275,6 +276,11 @@
new TraceWriterImpl(this, id, target_buffer));
}
+bool SharedMemoryArbiterImpl::BindStartupTraceWriter(StartupTraceWriter* writer,
+ BufferID target_buffer) {
+ return writer->BindToArbiter(this, target_buffer);
+}
+
void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
bool should_post_commit_task = false;
{
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 7507a9a..0ef8abe 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -36,6 +36,7 @@
class CommitDataRequest;
class PatchList;
class TraceWriter;
+class TraceWriterImpl;
namespace base {
class TaskRunner;
@@ -103,7 +104,9 @@
// SharedMemoryArbiter implementation.
// See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
std::unique_ptr<TraceWriter> CreateTraceWriter(
- BufferID target_buffer = 0) override;
+ BufferID target_buffer) override;
+ bool BindStartupTraceWriter(StartupTraceWriter* writer,
+ BufferID target_buffer) override;
void NotifyFlushComplete(FlushRequestID) override;
diff --git a/src/tracing/core/startup_trace_writer.cc b/src/tracing/core/startup_trace_writer.cc
new file mode 100644
index 0000000..473f04a
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer.cc
@@ -0,0 +1,322 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer.h"
+
+#include "perfetto/base/logging.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "src/tracing/core/patch_list.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+namespace {
+
+SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter,
+ WriterID writer_id,
+ ChunkID chunk_id,
+ bool fragmenting_packet) {
+ ChunkHeader::Packets packets = {};
+ if (fragmenting_packet) {
+ packets.count = 1;
+ packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
+ }
+
+ // The memory order of the stores below doesn't really matter. This |header|
+ // is just a local temporary object. The GetNewChunk() call below will copy it
+ // into the shared buffer with the proper barriers.
+ ChunkHeader header = {};
+ header.writer_id.store(writer_id, std::memory_order_relaxed);
+ header.chunk_id.store(chunk_id, std::memory_order_relaxed);
+ header.packets.store(packets, std::memory_order_relaxed);
+
+ return arbiter->GetNewChunk(header);
+}
+
+class LocalBufferReader {
+ public:
+ LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
+ : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
+
+ size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, size_t num_bytes) {
+ PERFETTO_CHECK(target_chunk->payload_size() >= num_bytes);
+ uint8_t* chunk_payload = target_chunk->payload_begin();
+ size_t bytes_read = 0;
+ while (bytes_read < num_bytes) {
+ if (cur_slice_ == buffer_slices_.end())
+ return bytes_read;
+
+ auto cur_slice_range = cur_slice_->GetUsedRange();
+
+ if (cur_slice_range.size() == cur_slice_offset_) {
+ cur_slice_offset_ = 0;
+ cur_slice_++;
+ continue;
+ }
+
+ size_t read_size = std::min(num_bytes - bytes_read,
+ cur_slice_range.size() - cur_slice_offset_);
+ memcpy(chunk_payload + bytes_read,
+ cur_slice_range.begin + cur_slice_offset_, read_size);
+ cur_slice_offset_ += read_size;
+ bytes_read += read_size;
+
+ // Should have either read all of the chunk or completed reading now.
+ PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() ||
+ bytes_read == num_bytes);
+ }
+ return bytes_read;
+ }
+
+ private:
+ const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
+
+ // Iterator pointing to slice in |buffer_slices_| that we're currently reading
+ // from.
+ std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_;
+ // Read offset in the current slice in bytes.
+ size_t cur_slice_offset_ = 0;
+};
+
+} // namespace
+
+StartupTraceWriter::StartupTraceWriter()
+ : memory_buffer_(new protozero::ScatteredHeapBuffer()),
+ memory_stream_writer_(
+ new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
+ memory_buffer_->set_writer(memory_stream_writer_.get());
+ PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_);
+}
+
+StartupTraceWriter::StartupTraceWriter(
+ std::unique_ptr<TraceWriter> trace_writer)
+ : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
+
+StartupTraceWriter::~StartupTraceWriter() = default;
+
+bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
+ BufferID target_buffer) {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ PERFETTO_DCHECK(!trace_writer_);
+
+ // Can't bind while the writer thread is writing.
+ if (write_in_progress_)
+ return false;
+
+ // If there's a pending trace packet, it should have been completed by the
+ // writer thread before write_in_progress_ is reset.
+ if (cur_packet_) {
+ PERFETTO_DCHECK(cur_packet_->is_finalized());
+ cur_packet_.reset();
+ }
+ memory_stream_writer_.reset();
+
+ trace_writer_ = arbiter->CreateTraceWriter(target_buffer);
+
+ ChunkID next_chunk_id = CommitLocalBufferChunks(
+ arbiter, trace_writer_->writer_id(), target_buffer);
+ memory_buffer_.reset();
+
+ // The real TraceWriter should start writing at the subsequent chunk ID.
+ bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
+ PERFETTO_DCHECK(success);
+
+ return true;
+}
+
+TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() {
+ PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+
+ // Check if we are already bound without grabbing the lock. This is an
+ // optimization to avoid any locking in the common case where the proxy was
+ // bound some time ago.
+ if (PERFETTO_LIKELY(was_bound_)) {
+ PERFETTO_DCHECK(!cur_packet_);
+ PERFETTO_DCHECK(trace_writer_);
+ return trace_writer_->NewTracePacket();
+ }
+
+ // Now grab the lock and safely check whether we are still unbound.
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (trace_writer_) {
+ PERFETTO_DCHECK(!cur_packet_);
+ // Set the |was_bound_| flag to avoid locking in future calls to
+ // NewTracePacket().
+ was_bound_ = true;
+ return trace_writer_->NewTracePacket();
+ }
+ // Not bound. Make sure it stays this way until the TracePacketHandle goes
+ // out of scope by setting |write_in_progress_|.
+ PERFETTO_DCHECK(!write_in_progress_);
+ write_in_progress_ = true;
+ }
+
+ // Write to the local buffer.
+ if (cur_packet_) {
+ // If we hit this, the caller is calling NewTracePacket() without having
+ // finalized the previous packet.
+ PERFETTO_DCHECK(cur_packet_->is_finalized());
+ } else {
+ cur_packet_.reset(new protos::pbzero::TracePacket());
+ }
+ cur_packet_->Reset(memory_stream_writer_.get());
+ TraceWriter::TracePacketHandle handle(cur_packet_.get());
+ // |this| outlives the packet handle.
+ handle.set_finalization_listener(this);
+ return handle;
+}
+
+void StartupTraceWriter::Flush(std::function<void()> callback) {
+ PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+ // It's fine to check |was_bound_| instead of acquiring the lock because
+ // |trace_writer_| will only need flushing after the first trace packet was
+ // written to it and |was_bound_| is set.
+ if (PERFETTO_LIKELY(was_bound_)) {
+ PERFETTO_DCHECK(trace_writer_);
+ return trace_writer_->Flush(std::move(callback));
+ }
+
+ // Can't flush while unbound.
+ callback();
+}
+
+WriterID StartupTraceWriter::writer_id() const {
+ PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+ // We can't acquire the lock because this is a const method. So we'll only
+ // proxy to |trace_writer_| once we have written the first packet to it
+ // instead.
+ if (PERFETTO_LIKELY(was_bound_)) {
+ PERFETTO_DCHECK(trace_writer_);
+ return trace_writer_->writer_id();
+ }
+ return 0;
+}
+
+size_t StartupTraceWriter::used_buffer_size() {
+ PERFETTO_DCHECK_THREAD(writer_thread_checker_);
+ if (PERFETTO_LIKELY(was_bound_))
+ return 0;
+
+ std::lock_guard<std::mutex> lock(lock_);
+ if (trace_writer_)
+ return 0;
+
+ size_t used_size = 0;
+ memory_buffer_->AdjustUsedSizeOfCurrentSlice();
+ for (const auto& slice : memory_buffer_->slices()) {
+ used_size += slice.GetUsedRange().size();
+ }
+ return used_size;
+}
+
+void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) {
+ PERFETTO_DCHECK(cur_packet_.get() == message);
+ PERFETTO_DCHECK(cur_packet_->is_finalized());
+ // Finalize() is a no-op because the packet is already finalized.
+ uint32_t packet_size = cur_packet_->Finalize();
+ packet_sizes_.push_back(packet_size);
+ total_payload_size += packet_size;
+
+ // Write is complete, reset the flag to allow binding.
+ std::lock_guard<std::mutex> lock(lock_);
+ PERFETTO_DCHECK(write_in_progress_);
+ write_in_progress_ = false;
+}
+
+ChunkID StartupTraceWriter::CommitLocalBufferChunks(
+ SharedMemoryArbiterImpl* arbiter,
+ WriterID writer_id,
+ BufferID target_buffer) {
+ // TODO(eseckler): Write and commit these chunks asynchronously. This would
+ // require that the service is informed of the missing initial chunks, e.g. by
+ // committing our first chunk here before the new trace writer has a chance to
+ // commit its first chunk. Otherwise the service wouldn't know to wait for our
+ // chunks.
+
+ if (packet_sizes_.empty() || !writer_id)
+ return 0;
+
+ LocalBufferReader local_buffer_reader(memory_buffer_.get());
+
+ ChunkID next_chunk_id = 0;
+ SharedMemoryABI::Chunk cur_chunk =
+ NewChunk(arbiter, writer_id, next_chunk_id++, false);
+
+ size_t max_payload_size = cur_chunk.payload_size();
+ size_t cur_payload_size = 0;
+ uint16_t cur_num_packets = 0;
+ size_t total_num_packets = packet_sizes_.size();
+ PatchList empty_patch_list;
+ for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) {
+ uint32_t packet_size = packet_sizes_[packet_idx];
+ uint32_t remaining_packet_size = packet_size;
+ ++cur_num_packets;
+ do {
+ uint32_t fragment_size = static_cast<uint32_t>(
+ std::min(static_cast<size_t>(remaining_packet_size),
+ max_payload_size - cur_payload_size));
+ cur_payload_size += fragment_size;
+ remaining_packet_size -= fragment_size;
+
+ bool last_write =
+ packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
+
+ // Find num_packets that we should copy into current chunk and their
+ // payload_size.
+ bool write_chunk = cur_num_packets == ChunkHeader::Packets::kMaxCount ||
+ cur_payload_size == max_payload_size || last_write;
+
+ if (write_chunk) {
+ // Write chunk payload.
+ local_buffer_reader.ReadBytes(&cur_chunk, cur_payload_size);
+
+ auto new_packet_count =
+ cur_chunk.IncreasePacketCountTo(cur_num_packets);
+ PERFETTO_DCHECK(new_packet_count == cur_num_packets);
+
+ bool is_fragmenting = remaining_packet_size > 0;
+ if (is_fragmenting)
+ cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+
+ arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
+ &empty_patch_list);
+
+ // Avoid creating a new chunk after the last write.
+ if (!last_write) {
+ cur_chunk =
+ NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting);
+ max_payload_size = cur_chunk.payload_size();
+ cur_payload_size = 0;
+ cur_num_packets = is_fragmenting ? 1 : 0;
+ } else {
+ PERFETTO_DCHECK(!is_fragmenting);
+ }
+ }
+ } while (remaining_packet_size > 0);
+ }
+
+ // The last chunk should have been returned.
+ PERFETTO_DCHECK(!cur_chunk.is_valid());
+
+ return next_chunk_id;
+}
+
+} // namespace perfetto
diff --git a/src/tracing/core/startup_trace_writer_unittest.cc b/src/tracing/core/startup_trace_writer_unittest.cc
new file mode 100644
index 0000000..7a6d973
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer_unittest.cc
@@ -0,0 +1,218 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/tracing/core/tracing_service.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+#include "src/tracing/test/aligned_buffer_test.h"
+#include "src/tracing/test/fake_producer_endpoint.h"
+
+#include "perfetto/trace/test_event.pbzero.h"
+#include "perfetto/trace/trace_packet.pbzero.h"
+
+namespace perfetto {
+namespace {
+
+class StartupTraceWriterTest : public AlignedBufferTest {
+ public:
+ void SetUp() override {
+ SharedMemoryArbiterImpl::set_default_layout_for_testing(
+ SharedMemoryABI::PageLayout::kPageDiv4);
+ AlignedBufferTest::SetUp();
+ task_runner_.reset(new base::TestTaskRunner());
+ arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
+ &fake_producer_endpoint_,
+ task_runner_.get()));
+ }
+
+ void TearDown() override {
+ arbiter_.reset();
+ task_runner_.reset();
+ }
+
+ void WritePackets(StartupTraceWriter* writer, size_t packet_count) {
+ for (size_t i = 0; i < packet_count; i++) {
+ auto packet = writer->NewTracePacket();
+ packet->set_for_testing()->set_str("foo");
+ }
+ }
+
+ void VerifyPacketCount(size_t expected_count) {
+ SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
+ size_t packets_count = 0;
+ ChunkID current_max_chunk_id = 0;
+ for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
+ uint32_t page_layout = abi->GetPageLayout(page_idx);
+ size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
+ for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+ auto chunk_state = abi->GetChunkState(page_idx, chunk_idx);
+ ASSERT_TRUE(chunk_state == SharedMemoryABI::kChunkFree ||
+ chunk_state == SharedMemoryABI::kChunkComplete);
+ auto chunk = abi->TryAcquireChunkForReading(page_idx, chunk_idx);
+ if (!chunk.is_valid())
+ continue;
+
+ // Should only see new chunks with IDs larger than the previous read
+ // since our reads and writes are serialized.
+ ChunkID chunk_id = chunk.header()->chunk_id.load();
+ if (last_read_max_chunk_id_ != 0)
+ EXPECT_LT(last_read_max_chunk_id_, chunk_id);
+ current_max_chunk_id = std::max(current_max_chunk_id, chunk_id);
+
+ auto packets_header = chunk.header()->packets.load();
+ packets_count += packets_header.count;
+ if (packets_header.flags &
+ SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk) {
+ // Don't count fragmented packets twice.
+ packets_count--;
+ }
+ abi->ReleaseChunkAsFree(std::move(chunk));
+ }
+ }
+ last_read_max_chunk_id_ = current_max_chunk_id;
+ EXPECT_EQ(expected_count, packets_count);
+ }
+
+ FakeProducerEndpoint fake_producer_endpoint_;
+ std::unique_ptr<base::TestTaskRunner> task_runner_;
+ std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
+ std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
+
+ ChunkID last_read_max_chunk_id_ = 0;
+};
+
+size_t const kPageSizes[] = {4096, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize,
+ StartupTraceWriterTest,
+ ::testing::ValuesIn(kPageSizes));
+
+TEST_P(StartupTraceWriterTest, CreateUnboundAndBind) {
+ // Create an unbound writer.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+ // Bind it right away without having written any data before.
+ const BufferID kBufId = 42;
+ EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+ const size_t kNumPackets = 32;
+ WritePackets(writer.get(), kNumPackets);
+ // Finalizes the last packet and returns the chunk.
+ writer.reset();
+
+ VerifyPacketCount(kNumPackets);
+}
+
+TEST_P(StartupTraceWriterTest, CreateBound) {
+ // Create a bound writer immediately.
+ const BufferID kBufId = 42;
+ std::unique_ptr<StartupTraceWriter> writer(
+ new StartupTraceWriter(arbiter_->CreateTraceWriter(kBufId)));
+
+ const size_t kNumPackets = 32;
+ WritePackets(writer.get(), kNumPackets);
+ // Finalizes the last packet and returns the chunk.
+ writer.reset();
+
+ VerifyPacketCount(kNumPackets);
+}
+
+TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndDiscard) {
+ // Create an unbound writer.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+ const size_t kNumPackets = 32;
+ WritePackets(writer.get(), kNumPackets);
+
+ // Should discard the written data.
+ writer.reset();
+
+ VerifyPacketCount(0);
+}
+
+TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndBind) {
+ // Create an unbound writer.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+ const size_t kNumPackets = 32;
+ WritePackets(writer.get(), kNumPackets);
+
+ // Binding the writer should cause the previously written packets to be
+ // written to the SMB and committed.
+ const BufferID kBufId = 42;
+ EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+ VerifyPacketCount(kNumPackets);
+
+ // Any further packets should be written to the SMB directly.
+ const size_t kNumAdditionalPackets = 16;
+ WritePackets(writer.get(), kNumAdditionalPackets);
+ // Finalizes the last packet and returns the chunk.
+ writer.reset();
+
+ VerifyPacketCount(kNumAdditionalPackets);
+}
+
+TEST_P(StartupTraceWriterTest, WriteMultipleChunksWhileUnboundAndBind) {
+ // Create an unbound writer.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+ // Write a single packet to determine its size in the buffer.
+ WritePackets(writer.get(), 1);
+ size_t packet_size = writer->used_buffer_size();
+
+ // Write at least 3 pages worth of packets.
+ const size_t kNumPackets = (page_size() * 3 + packet_size - 1) / packet_size;
+ WritePackets(writer.get(), kNumPackets);
+
+ // Binding the writer should cause the previously written packets to be
+ // written to the SMB and committed.
+ const BufferID kBufId = 42;
+ EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+
+ VerifyPacketCount(kNumPackets + 1);
+
+ // Any further packets should be written to the SMB directly.
+ const size_t kNumAdditionalPackets = 16;
+ WritePackets(writer.get(), kNumAdditionalPackets);
+ // Finalizes the last packet and returns the chunk.
+ writer.reset();
+
+ VerifyPacketCount(kNumAdditionalPackets);
+}
+
+TEST_P(StartupTraceWriterTest, BindingWhileWritingFails) {
+ // Create an unbound writer.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+
+ const BufferID kBufId = 42;
+ {
+ // Begin a write by opening a TracePacket
+ auto packet = writer->NewTracePacket();
+
+ // Binding while writing should fail.
+ EXPECT_FALSE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ }
+
+ // Packet was completed, so binding should work now and emit the packet.
+ EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ VerifyPacketCount(1);
+}
+
+} // namespace
+} // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index 840588d..fcc29bc 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -220,8 +220,19 @@
return id_;
}
-// Base class ctor/dtor definition.
+bool TraceWriterImpl::SetFirstChunkId(ChunkID chunk_id) {
+ if (next_chunk_id_ > 0)
+ return false;
+ next_chunk_id_ = chunk_id;
+ return true;
+}
+
+// Base class definitions.
TraceWriter::TraceWriter() = default;
TraceWriter::~TraceWriter() = default;
+bool TraceWriter::SetFirstChunkId(ChunkID) {
+ return false;
+}
+
} // namespace perfetto
diff --git a/src/tracing/core/trace_writer_impl.h b/src/tracing/core/trace_writer_impl.h
index 80a3ffe..9e0cdb4 100644
--- a/src/tracing/core/trace_writer_impl.h
+++ b/src/tracing/core/trace_writer_impl.h
@@ -40,6 +40,7 @@
TracePacketHandle NewTracePacket() override;
void Flush(std::function<void()> callback = {}) override;
WriterID writer_id() const override;
+ bool SetFirstChunkId(ChunkID) override;
void ResetChunkForTesting() { cur_chunk_ = SharedMemoryABI::Chunk(); }
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index bf56401..1194052 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -24,6 +24,7 @@
#include "src/base/test/test_task_runner.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
#include "src/tracing/test/aligned_buffer_test.h"
+#include "src/tracing/test/fake_producer_endpoint.h"
#include "perfetto/trace/test_event.pbzero.h"
#include "perfetto/trace/trace_packet.pbzero.h"
@@ -31,26 +32,6 @@
namespace perfetto {
namespace {
-class FakeProducerEndpoint : public TracingService::ProducerEndpoint {
- public:
- void RegisterDataSource(const DataSourceDescriptor&) override {}
- void UnregisterDataSource(const std::string&) override {}
- void RegisterTraceWriter(uint32_t, uint32_t) override {}
- void UnregisterTraceWriter(uint32_t) override {}
- void CommitData(const CommitDataRequest& req, CommitDataCallback) override {
- last_commit_data_request = req;
- }
- void NotifyFlushComplete(FlushRequestID) override {}
- void NotifyDataSourceStopped(DataSourceInstanceID) override {}
- SharedMemory* shared_memory() const override { return nullptr; }
- size_t shared_buffer_page_size_kb() const override { return 0; }
- std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
- return nullptr;
- }
-
- CommitDataRequest last_commit_data_request;
-};
-
class TraceWriterImplTest : public AlignedBufferTest {
public:
void SetUp() override {
diff --git a/src/tracing/test/fake_producer_endpoint.h b/src/tracing/test/fake_producer_endpoint.h
new file mode 100644
index 0000000..478ffe8
--- /dev/null
+++ b/src/tracing/test/fake_producer_endpoint.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_
+#define SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_
+
+#include "perfetto/tracing/core/commit_data_request.h"
+#include "perfetto/tracing/core/tracing_service.h"
+
+namespace perfetto {
+
+class FakeProducerEndpoint : public TracingService::ProducerEndpoint {
+ public:
+ void RegisterDataSource(const DataSourceDescriptor&) override {}
+ void UnregisterDataSource(const std::string&) override {}
+ void RegisterTraceWriter(uint32_t, uint32_t) override {}
+ void UnregisterTraceWriter(uint32_t) override {}
+ void CommitData(const CommitDataRequest& req, CommitDataCallback) override {
+ last_commit_data_request = req;
+ }
+ void NotifyFlushComplete(FlushRequestID) override {}
+ void NotifyDataSourceStopped(DataSourceInstanceID) override {}
+ SharedMemory* shared_memory() const override { return nullptr; }
+ size_t shared_buffer_page_size_kb() const override { return 0; }
+ std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
+ return nullptr;
+ }
+
+ CommitDataRequest last_commit_data_request;
+};
+
+} // namespace perfetto
+
+#endif // SRC_TRACING_TEST_FAKE_PRODUCER_ENDPOINT_H_