trace writer: Add StartupTraceWriterRegistry.
Adds a registry for StartupTraceWriters that keeps track of any unbound
writers and binds them once the SMB is available.
Also fixes a few bugs in the StartupTraceWriter implementation.
Bug: 122507010
Change-Id: Id8de101d9aa4132467d67be2ded3d30b6b8fd52a
diff --git a/Android.bp b/Android.bp
index 8de77cf..f3df9e5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -89,6 +89,7 @@
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -281,6 +282,7 @@
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -426,6 +428,7 @@
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -624,6 +627,7 @@
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -2348,6 +2352,7 @@
"src/tracing/core/shared_memory_arbiter_impl.cc",
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
"src/tracing/core/trace_buffer.cc",
@@ -2681,6 +2686,7 @@
"src/tracing/core/sliced_protobuf_input_stream.cc",
"src/tracing/core/sliced_protobuf_input_stream_unittest.cc",
"src/tracing/core/startup_trace_writer.cc",
+ "src/tracing/core/startup_trace_writer_registry.cc",
"src/tracing/core/startup_trace_writer_unittest.cc",
"src/tracing/core/sys_stats_config.cc",
"src/tracing/core/test_config.cc",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index c4e7eed..d9e375c 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -28,6 +28,7 @@
"shared_memory_arbiter.h",
"slice.h",
"startup_trace_writer.h",
+ "startup_trace_writer_registry.h",
"trace_config.h",
"trace_packet.h",
"trace_stats.h",
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index e285648..5dc4c03 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -35,6 +35,7 @@
class CommitDataRequest;
class StartupTraceWriter;
+class StartupTraceWriterRegistry;
class SharedMemory;
class TraceWriter;
@@ -51,14 +52,24 @@
virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) = 0;
- // Binds the provided unbound StartupTraceWriter to a new TraceWriter
- // associated with the arbiter's SMB. Returns |false| if binding failed
- // because the writer is concurrently writing data to its temporary buffer. In
- // this case, the caller should retry (it is free to try again immediately or
- // schedule a wakeup to retry later).
- virtual bool BindStartupTraceWriter(StartupTraceWriter* writer,
- BufferID target_buffer)
- PERFETTO_WARN_UNUSED_RESULT = 0;
+ // Binds the provided unbound StartupTraceWriterRegistry to the arbiter's SMB.
+ // Normally this happens when the perfetto service has been initialized and we
+ // want to rebind all the writers created in the early startup phase.
+ //
+ // All StartupTraceWriters created by the registry are bound to the arbiter
+ // and the given target buffer. The writers may not be bound immediately if
+ // they are concurrently being written to. The registry will retry on the
+ // arbiter's TaskRunner until all writers were bound successfully.
+ //
+ // Should only be called on the passed TaskRunner's sequence. By calling this
+ // method, the registry's ownership is transferred to the arbiter. The arbiter
+ // will delete the registry once all writers were bound.
+ //
+ // TODO(eseckler): Make target buffer assignment more flexible (i.e. per
+ // writer). For now, embedders can use multiple registries instead.
+ virtual void BindStartupTraceWriterRegistry(
+ std::unique_ptr<StartupTraceWriterRegistry>,
+ BufferID target_buffer) = 0;
// Notifies the service that all data for the given FlushRequestID has been
// committed in the shared memory buffer.
diff --git a/include/perfetto/tracing/core/startup_trace_writer.h b/include/perfetto/tracing/core/startup_trace_writer.h
index b93e2f1..d1ad82c 100644
--- a/include/perfetto/tracing/core/startup_trace_writer.h
+++ b/include/perfetto/tracing/core/startup_trace_writer.h
@@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
+#include <set>
#include <vector>
#include "perfetto/base/export.h"
@@ -33,6 +34,7 @@
namespace perfetto {
class SharedMemoryArbiterImpl;
+class StartupTraceWriterRegistryHandle;
namespace protos {
namespace pbzero {
@@ -44,8 +46,9 @@
// when the perfetto service is not available yet.
//
// Until the service is available, producer threads instantiate an unbound
-// StartupTraceWriter instance and use it to emit trace events. Each writer will
-// record the serialized trace events into a temporary local memory buffer.
+// StartupTraceWriter instance (via a StartupTraceWriterRegistry) and use it to
+// emit trace events. Each writer will record the serialized trace events into a
+// temporary local memory buffer.
//
// Once the service is available, the producer binds each StartupTraceWriter to
// the SMB by calling SharedMemoryArbiter::BindStartupTraceWriter(). The data in
@@ -66,10 +69,6 @@
: public TraceWriter,
public protozero::MessageHandleBase::FinalizationListener {
public:
- // Create an unbound StartupTraceWriter that can later be bound by calling
- // BindToTraceWriter().
- StartupTraceWriter();
-
// Create a StartupTraceWriter bound to |trace_writer|. Should only be called
// on the writer thread.
explicit StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer);
@@ -87,19 +86,6 @@
uint64_t written() const override;
- // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
- // Called by SharedMemoryArbiterImpl::BindStartupTraceWriter().
- //
- // This method can be called on any thread. If any data was written locally
- // before the writer was bound, BindToArbiter() will copy this data into
- // chunks in the provided target buffer via the SMB. Any future packets will
- // be directly written into the SMB via a newly obtained TraceWriter from the
- // arbiter.
- //
- // Will fail and return |false| if a concurrent write is in progress.
- bool BindToArbiter(SharedMemoryArbiterImpl*,
- BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
-
// Returns |true| if the writer thread has observed that the writer was bound
// to an SMB. Should only be called on the writer thread.
//
@@ -116,6 +102,31 @@
size_t used_buffer_size();
private:
+ friend class StartupTraceWriterRegistry;
+ friend class StartupTraceWriterTest;
+
+ // Create an unbound StartupTraceWriter associated with the registry pointed
+ // to by the handle. The writer can later be bound by calling
+ // BindToTraceWriter(). The registry handle may be nullptr in tests.
+ StartupTraceWriter(std::shared_ptr<StartupTraceWriterRegistryHandle>);
+
+ StartupTraceWriter(const StartupTraceWriter&) = delete;
+ StartupTraceWriter& operator=(const StartupTraceWriter&) = delete;
+
+ // Bind this StartupTraceWriter to the provided SharedMemoryArbiterImpl.
+ // Called by StartupTraceWriterRegistry::BindToArbiter().
+ //
+ // This method can be called on any thread. If any data was written locally
+ // before the writer was bound, BindToArbiter() will copy this data into
+ // chunks in the provided target buffer via the SMB. Any future packets will
+ // be directly written into the SMB via a newly obtained TraceWriter from the
+ // arbiter.
+ //
+ // Will fail and return |false| if a concurrent write is in progress. Returns
+ // |true| if successfully bound and should then not be called again.
+ bool BindToArbiter(SharedMemoryArbiterImpl*,
+ BufferID target_buffer) PERFETTO_WARN_UNUSED_RESULT;
+
// protozero::MessageHandleBase::FinalizationListener implementation.
void OnMessageFinalized(protozero::Message* message) override;
@@ -124,6 +135,8 @@
PERFETTO_THREAD_CHECKER(writer_thread_checker_)
+ std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle_;
+
// Only set and accessed from the writer thread. The writer thread flips this
// bit when it sees that trace_writer_ is set (while holding the lock).
// Caching this fact in this variable avoids the need to acquire the lock to
@@ -141,7 +154,6 @@
std::unique_ptr<protozero::ScatteredStreamWriter> memory_stream_writer_;
std::vector<uint32_t> packet_sizes_;
- size_t total_payload_size = 0;
// Whether the writer thread is currently writing a TracePacket.
bool write_in_progress_ = false;
diff --git a/include/perfetto/tracing/core/startup_trace_writer_registry.h b/include/perfetto/tracing/core/startup_trace_writer_registry.h
new file mode 100644
index 0000000..db19862
--- /dev/null
+++ b/include/perfetto/tracing/core/startup_trace_writer_registry.h
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <vector>
+
+#include "perfetto/base/export.h"
+#include "perfetto/base/weak_ptr.h"
+#include "perfetto/tracing/core/basic_types.h"
+
+namespace perfetto {
+
+class SharedMemoryArbiterImpl;
+class StartupTraceWriter;
+class StartupTraceWriterRegistry;
+
+namespace base {
+class TaskRunner;
+} // namespace base
+
+// Notifies the registry about the destruction of a StartupTraceWriter, provided
+// the registry itself wasn't deleted yet. The indirection via the handle is
+// necessary to avoid potential deadlocks caused by lock order inversion. These
+// issues are avoided by locking on the handle's common lock in the destructors
+// of the registry and writer.
+class StartupTraceWriterRegistryHandle {
+ public:
+ explicit StartupTraceWriterRegistryHandle(StartupTraceWriterRegistry*);
+
+ // Called by StartupTraceWriter destructor.
+ void OnWriterDestroyed(StartupTraceWriter*);
+
+ // Called by StartupTraceWriterRegistry destructor.
+ void OnRegistryDestroyed();
+
+ private:
+ StartupTraceWriterRegistryHandle(const StartupTraceWriterRegistryHandle&) =
+ delete;
+ StartupTraceWriterRegistryHandle& operator=(
+ const StartupTraceWriterRegistryHandle&) = delete;
+
+ std::mutex lock_;
+ StartupTraceWriterRegistry* registry_;
+};
+
+// Embedders can use this registry to create unbound StartupTraceWriters during
+// startup, and later bind them all safely to an arbiter and target buffer.
+class PERFETTO_EXPORT StartupTraceWriterRegistry {
+ public:
+ StartupTraceWriterRegistry();
+ ~StartupTraceWriterRegistry();
+
+ // Returns a new unbound StartupTraceWriter. Should only be called while
+ // unbound. Usually called on a writer thread.
+ std::unique_ptr<StartupTraceWriter> CreateUnboundTraceWriter();
+
+ // Return an unbound StartupTraceWriter back to the registry before it could
+ // be bound (usually called when the writer's thread is destroyed). The
+ // registry will keep this writer alive until the registry is bound to an
+ // arbiter (or destroyed itself). This way, its buffered data is retained.
+ // Should only be called while unbound. All packets written to the passed
+ // writer should have been completed and it should no longer be used to write
+ // data after calling this method.
+ void ReturnUnboundTraceWriter(std::unique_ptr<StartupTraceWriter>);
+
+ // Binds all StartupTraceWriters created by this registry to the given arbiter
+ // and target buffer. Should only be called once and on the passed
+ // TaskRunner's sequence. See
+ // SharedMemoryArbiter::BindStartupTraceWriterRegistry() for details.
+ //
+ // Note that the writers may not be bound synchronously if they are
+ // concurrently being written to. The registry will retry on the passed
+ // TaskRunner until all writers were bound successfully.
+ //
+ // Calls |on_bound_callback| asynchronously on |trace_writer| once all writers
+ // were bound.
+ void BindToArbiter(
+ SharedMemoryArbiterImpl*,
+ BufferID target_buffer,
+ base::TaskRunner*,
+ std::function<void(StartupTraceWriterRegistry*)> on_bound_callback);
+
+ private:
+ friend class StartupTraceWriterRegistryHandle;
+ friend class StartupTraceWriterTest;
+
+ StartupTraceWriterRegistry(const StartupTraceWriterRegistry&) = delete;
+ StartupTraceWriterRegistry& operator=(const StartupTraceWriterRegistry&) =
+ delete;
+
+ // Called by StartupTraceWriterRegistryHandle.
+ void OnStartupTraceWriterDestroyed(StartupTraceWriter*);
+
+ // Try to bind the remaining unbound writers and post a continuation to
+ // |task_runner_| if any writers could not be bound.
+ void TryBindWriters();
+
+ // Notifies the arbiter when we have bound all writers. May delete |this|.
+ void OnUnboundWritersRemovedLocked();
+
+ std::shared_ptr<StartupTraceWriterRegistryHandle> handle_;
+
+ // Begin lock-protected members.
+ std::mutex lock_;
+
+ // Unbound writers that we handed out to writer threads. These writers may be
+ // concurrently written to by the writer threads.
+ std::set<StartupTraceWriter*> unbound_writers_;
+
+ // Unbound writers that writer threads returned to the registry by calling
+ // ReturnUnboundTraceWriter(). Writers are removed from |unbound_writers_|
+ // when they are added to |unbound_owned_writers_|. No new data can be written
+ // to these writers.
+ std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers_;
+
+ SharedMemoryArbiterImpl* arbiter_ = nullptr; // |nullptr| while unbound.
+ BufferID target_buffer_ = 0;
+ base::TaskRunner* task_runner_;
+ std::function<void(StartupTraceWriterRegistry*)> on_bound_callback_ = nullptr;
+
+ // Keep at the end. Initialized during |BindToArbiter()|, like |task_runner_|.
+ // Weak pointers are only valid on |task_runner_|'s thread/sequence.
+ std::unique_ptr<base::WeakPtrFactory<StartupTraceWriterRegistry>>
+ weak_ptr_factory_;
+ // End lock-protected members.
+};
+
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_TRACING_CORE_STARTUP_TRACE_WRITER_REGISTRY_H_
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 4cf124e..06969cc 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -55,6 +55,7 @@
"core/sliced_protobuf_input_stream.cc",
"core/sliced_protobuf_input_stream.h",
"core/startup_trace_writer.cc",
+ "core/startup_trace_writer_registry.cc",
"core/sys_stats_config.cc",
"core/test_config.cc",
"core/trace_buffer.cc",
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index e4d0e2d..9a3f2d6 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -21,7 +21,7 @@
#include "perfetto/base/time.h"
#include "perfetto/tracing/core/commit_data_request.h"
#include "perfetto/tracing/core/shared_memory.h"
-#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
#include "src/tracing/core/null_trace_writer.h"
#include "src/tracing/core/trace_writer_impl.h"
@@ -276,9 +276,35 @@
new TraceWriterImpl(this, id, target_buffer));
}
-bool SharedMemoryArbiterImpl::BindStartupTraceWriter(StartupTraceWriter* writer,
- BufferID target_buffer) {
- return writer->BindToArbiter(this, target_buffer);
+void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry(
+ std::unique_ptr<StartupTraceWriterRegistry> registry,
+ BufferID target_buffer) {
+ // The registry will be owned by the arbiter, so it's safe to capture |this|
+ // in the callback.
+ auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) {
+ std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete;
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+
+ for (auto it = startup_trace_writer_registries_.begin();
+ it != startup_trace_writer_registries_.end(); it++) {
+ if (it->get() == bound_registry) {
+ // We can't delete the registry while the arbiter's lock is held
+ // (to avoid lock inversion).
+ registry_to_delete = std::move(*it);
+ startup_trace_writer_registries_.erase(it);
+ break;
+ }
+ }
+ }
+
+ // The registry should have been in |startup_trace_writer_registries_|.
+ PERFETTO_DCHECK(registry_to_delete);
+ registry_to_delete.reset();
+ };
+ registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback);
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ startup_trace_writer_registries_.push_back(std::move(registry));
}
void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 0ef8abe..0707bcf 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -29,6 +29,7 @@
#include "perfetto/tracing/core/basic_types.h"
#include "perfetto/tracing/core/shared_memory_abi.h"
#include "perfetto/tracing/core/shared_memory_arbiter.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
#include "src/tracing/core/id_allocator.h"
namespace perfetto {
@@ -105,13 +106,15 @@
// See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) override;
- bool BindStartupTraceWriter(StartupTraceWriter* writer,
- BufferID target_buffer) override;
+ void BindStartupTraceWriterRegistry(
+ std::unique_ptr<StartupTraceWriterRegistry>,
+ BufferID target_buffer) override;
void NotifyFlushComplete(FlushRequestID) override;
private:
friend class TraceWriterImpl;
+ friend class StartupTraceWriterTest;
static SharedMemoryABI::PageLayout default_page_layout;
@@ -137,6 +140,10 @@
std::unique_ptr<CommitDataRequest> commit_data_req_;
size_t bytes_pending_commit_ = 0; // SUM(chunk.size() : commit_data_req_).
IdAllocator<WriterID> active_writer_ids_;
+ // Registries whose Bind() is in progress. We destroy each registry when their
+ // Bind() is complete or when the arbiter is destroyed itself.
+ std::vector<std::unique_ptr<StartupTraceWriterRegistry>>
+ startup_trace_writer_registries_;
// --- End lock-protected members ---
// Keep at the end.
diff --git a/src/tracing/core/startup_trace_writer.cc b/src/tracing/core/startup_trace_writer.cc
index eb1d320..f4f7f05 100644
--- a/src/tracing/core/startup_trace_writer.cc
+++ b/src/tracing/core/startup_trace_writer.cc
@@ -16,9 +16,13 @@
#include "perfetto/tracing/core/startup_trace_writer.h"
+#include <numeric>
+
#include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
#include "perfetto/trace/trace_packet.pbzero.h"
#include "perfetto/tracing/core/shared_memory_abi.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
#include "src/tracing/core/patch_list.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
@@ -54,9 +58,12 @@
LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
: buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
- size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, size_t num_bytes) {
- PERFETTO_CHECK(target_chunk->payload_size() >= num_bytes);
- uint8_t* chunk_payload = target_chunk->payload_begin();
+ size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
+ size_t num_bytes,
+ size_t cur_payload_size) {
+ PERFETTO_CHECK(target_chunk->payload_size() >=
+ num_bytes + cur_payload_size);
+ uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
size_t bytes_read = 0;
while (bytes_read < num_bytes) {
if (cur_slice_ == buffer_slices_.end())
@@ -72,8 +79,8 @@
size_t read_size = std::min(num_bytes - bytes_read,
cur_slice_range.size() - cur_slice_offset_);
- memcpy(chunk_payload + bytes_read,
- cur_slice_range.begin + cur_slice_offset_, read_size);
+ memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
+ read_size);
cur_slice_offset_ += read_size;
bytes_read += read_size;
@@ -84,6 +91,23 @@
return bytes_read;
}
+ size_t TotalUsedSize() const {
+ size_t used_size = 0;
+ for (const auto& slice : buffer_slices_) {
+ used_size += slice.GetUsedRange().size();
+ }
+ return used_size;
+ }
+
+ bool DidReadAllData() const {
+ if (cur_slice_ == buffer_slices_.end())
+ return true;
+
+ const auto next_slice = cur_slice_ + 1;
+ return next_slice == buffer_slices_.end() &&
+ cur_slice_->GetUsedRange().size() == cur_slice_offset_;
+ }
+
private:
const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
@@ -96,8 +120,10 @@
} // namespace
-StartupTraceWriter::StartupTraceWriter()
- : memory_buffer_(new protozero::ScatteredHeapBuffer()),
+StartupTraceWriter::StartupTraceWriter(
+ std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
+ : registry_handle_(std::move(registry_handle)),
+ memory_buffer_(new protozero::ScatteredHeapBuffer()),
memory_stream_writer_(
new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
memory_buffer_->set_writer(memory_stream_writer_.get());
@@ -108,7 +134,10 @@
std::unique_ptr<TraceWriter> trace_writer)
: was_bound_(true), trace_writer_(std::move(trace_writer)) {}
-StartupTraceWriter::~StartupTraceWriter() = default;
+StartupTraceWriter::~StartupTraceWriter() {
+ if (registry_handle_)
+ registry_handle_->OnWriterDestroyed(this);
+}
bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
BufferID target_buffer) {
@@ -126,18 +155,18 @@
PERFETTO_DCHECK(cur_packet_->is_finalized());
cur_packet_.reset();
}
- memory_stream_writer_.reset();
trace_writer_ = arbiter->CreateTraceWriter(target_buffer);
-
ChunkID next_chunk_id = CommitLocalBufferChunks(
arbiter, trace_writer_->writer_id(), target_buffer);
- memory_buffer_.reset();
// The real TraceWriter should start writing at the subsequent chunk ID.
bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
PERFETTO_DCHECK(success);
+ memory_stream_writer_.reset();
+ memory_buffer_.reset();
+
return true;
}
@@ -195,7 +224,8 @@
}
// Can't flush while unbound.
- callback();
+ if (callback)
+ callback();
}
WriterID StartupTraceWriter::writer_id() const {
@@ -245,7 +275,6 @@
// Finalize() is a no-op because the packet is already finalized.
uint32_t packet_size = cur_packet_->Finalize();
packet_sizes_.push_back(packet_size);
- total_payload_size += packet_size;
// Write is complete, reset the flag to allow binding.
std::lock_guard<std::mutex> lock(lock_);
@@ -266,8 +295,13 @@
if (packet_sizes_.empty() || !writer_id)
return 0;
+ memory_buffer_->AdjustUsedSizeOfCurrentSlice();
LocalBufferReader local_buffer_reader(memory_buffer_.get());
+ PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
+ std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
+ static_cast<size_t>(0u)));
+
ChunkID next_chunk_id = 0;
SharedMemoryABI::Chunk cur_chunk =
NewChunk(arbiter, writer_id, next_chunk_id++, false);
@@ -284,29 +318,41 @@
do {
uint32_t fragment_size = static_cast<uint32_t>(
std::min(static_cast<size_t>(remaining_packet_size),
- max_payload_size - cur_payload_size));
+ max_payload_size - cur_payload_size -
+ SharedMemoryABI::kPacketHeaderSize));
+ // Write packet header, i.e. the fragment size.
+ protozero::proto_utils::WriteRedundantVarInt(
+ fragment_size, cur_chunk.payload_begin() + cur_payload_size);
+ cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
+
+ // Copy packet content into the chunk.
+ size_t bytes_read = local_buffer_reader.ReadBytes(
+ &cur_chunk, fragment_size, cur_payload_size);
+ PERFETTO_DCHECK(bytes_read == fragment_size);
+
cur_payload_size += fragment_size;
remaining_packet_size -= fragment_size;
bool last_write =
packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
- // Find num_packets that we should copy into current chunk and their
- // payload_size.
- bool write_chunk = cur_num_packets == ChunkHeader::Packets::kMaxCount ||
- cur_payload_size == max_payload_size || last_write;
+ // We should return the current chunk if we've filled its payload, reached
+ // the maximum number of packets, or wrote everything we wanted to.
+ bool return_chunk =
+ cur_payload_size >=
+ max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
+ cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
- if (write_chunk) {
- // Write chunk payload.
- local_buffer_reader.ReadBytes(&cur_chunk, cur_payload_size);
-
+ if (return_chunk) {
auto new_packet_count =
cur_chunk.IncreasePacketCountTo(cur_num_packets);
PERFETTO_DCHECK(new_packet_count == cur_num_packets);
bool is_fragmenting = remaining_packet_size > 0;
- if (is_fragmenting)
+ if (is_fragmenting) {
+ PERFETTO_DCHECK(cur_payload_size == max_payload_size);
cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
+ }
arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
&empty_patch_list);
@@ -327,6 +373,8 @@
// The last chunk should have been returned.
PERFETTO_DCHECK(!cur_chunk.is_valid());
+ // We should have read all data from the local buffer.
+ PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
return next_chunk_id;
}
diff --git a/src/tracing/core/startup_trace_writer_registry.cc b/src/tracing/core/startup_trace_writer_registry.cc
new file mode 100644
index 0000000..6de4ab0
--- /dev/null
+++ b/src/tracing/core/startup_trace_writer_registry.cc
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+
+#include <functional>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/tracing/core/startup_trace_writer.h"
+#include "src/tracing/core/shared_memory_arbiter_impl.h"
+
+using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
+
+namespace perfetto {
+
+StartupTraceWriterRegistryHandle::StartupTraceWriterRegistryHandle(
+ StartupTraceWriterRegistry* registry)
+ : registry_(registry) {}
+
+void StartupTraceWriterRegistryHandle::OnWriterDestroyed(
+ StartupTraceWriter* writer) {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (registry_)
+ registry_->OnStartupTraceWriterDestroyed(writer);
+}
+
+void StartupTraceWriterRegistryHandle::OnRegistryDestroyed() {
+ std::lock_guard<std::mutex> lock(lock_);
+ registry_ = nullptr;
+}
+
+StartupTraceWriterRegistry::StartupTraceWriterRegistry()
+ : handle_(std::make_shared<StartupTraceWriterRegistryHandle>(this)) {}
+
+StartupTraceWriterRegistry::~StartupTraceWriterRegistry() {
+ handle_->OnRegistryDestroyed();
+}
+
+std::unique_ptr<StartupTraceWriter>
+StartupTraceWriterRegistry::CreateUnboundTraceWriter() {
+ std::lock_guard<std::mutex> lock(lock_);
+ PERFETTO_DCHECK(!arbiter_); // Should only be called while unbound.
+ std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter(handle_));
+ unbound_writers_.insert(writer.get());
+ return writer;
+}
+
+void StartupTraceWriterRegistry::ReturnUnboundTraceWriter(
+ std::unique_ptr<StartupTraceWriter> trace_writer) {
+ std::lock_guard<std::mutex> lock(lock_);
+ PERFETTO_DCHECK(!arbiter_); // Should only be called while unbound.
+ PERFETTO_DCHECK(!trace_writer->write_in_progress_);
+ PERFETTO_DCHECK(unbound_writers_.count(trace_writer.get()));
+ unbound_writers_.erase(trace_writer.get());
+ unbound_owned_writers_.push_back(std::move(trace_writer));
+}
+
+void StartupTraceWriterRegistry::BindToArbiter(
+ SharedMemoryArbiterImpl* arbiter,
+ BufferID target_buffer,
+ base::TaskRunner* task_runner,
+ std::function<void(StartupTraceWriterRegistry*)> on_bound_callback) {
+ std::vector<std::unique_ptr<StartupTraceWriter>> unbound_owned_writers;
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ PERFETTO_DCHECK(!arbiter_);
+ arbiter_ = arbiter;
+ target_buffer_ = target_buffer;
+ task_runner_ = task_runner;
+ weak_ptr_factory_.reset(
+ new base::WeakPtrFactory<StartupTraceWriterRegistry>(this));
+ on_bound_callback_ = std::move(on_bound_callback);
+ // We can't destroy the writers while holding |lock_|, so we swap them out
+ // here instead. After we are bound, no more writers can be added to the
+ // list.
+ unbound_owned_writers.swap(unbound_owned_writers_);
+ }
+
+ // Bind and destroy the owned writers.
+ for (const auto& writer : unbound_owned_writers) {
+ // This should succeed since nobody can write to these writers concurrently.
+ bool success = writer->BindToArbiter(arbiter_, target_buffer_);
+ PERFETTO_DCHECK(success);
+ }
+ unbound_owned_writers.clear();
+
+ TryBindWriters();
+}
+
+void StartupTraceWriterRegistry::TryBindWriters() {
+ std::lock_guard<std::mutex> lock(lock_);
+ for (auto it = unbound_writers_.begin(); it != unbound_writers_.end();) {
+ if ((*it)->BindToArbiter(arbiter_, target_buffer_)) {
+ it = unbound_writers_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ if (!unbound_writers_.empty()) {
+ auto weak_this = weak_ptr_factory_->GetWeakPtr();
+ task_runner_->PostTask([weak_this] {
+ if (weak_this)
+ weak_this->TryBindWriters();
+ });
+ }
+ OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnStartupTraceWriterDestroyed(
+ StartupTraceWriter* trace_writer) {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (unbound_writers_.erase(trace_writer) > 0)
+ OnUnboundWritersRemovedLocked();
+}
+
+void StartupTraceWriterRegistry::OnUnboundWritersRemovedLocked() {
+ if (!unbound_writers_.empty() || !task_runner_ || !on_bound_callback_)
+ return;
+
+ PERFETTO_DCHECK(weak_ptr_factory_);
+ auto weak_this = weak_ptr_factory_->GetWeakPtr();
+ // Run callback in PostTask() since the callback may delete |this| and thus
+ // might otherwise cause a deadlock.
+ auto callback = on_bound_callback_;
+ on_bound_callback_ = nullptr;
+ task_runner_->PostTask([weak_this, callback]() {
+ if (!weak_this)
+ return;
+ // Note: callback may delete |this|.
+ callback(weak_this.get());
+ });
+}
+
+} // namespace perfetto
diff --git a/src/tracing/core/startup_trace_writer_unittest.cc b/src/tracing/core/startup_trace_writer_unittest.cc
index fd9ea45..2566455 100644
--- a/src/tracing/core/startup_trace_writer_unittest.cc
+++ b/src/tracing/core/startup_trace_writer_unittest.cc
@@ -17,17 +17,21 @@
#include "perfetto/tracing/core/startup_trace_writer.h"
#include "gtest/gtest.h"
+#include "perfetto/tracing/core/startup_trace_writer_registry.h"
+#include "perfetto/tracing/core/trace_packet.h"
#include "perfetto/tracing/core/tracing_service.h"
#include "src/base/test/test_task_runner.h"
#include "src/tracing/core/shared_memory_arbiter_impl.h"
+#include "src/tracing/core/sliced_protobuf_input_stream.h"
+#include "src/tracing/core/trace_buffer.h"
#include "src/tracing/test/aligned_buffer_test.h"
#include "src/tracing/test/fake_producer_endpoint.h"
#include "perfetto/trace/test_event.pbzero.h"
+#include "perfetto/trace/trace_packet.pb.h"
#include "perfetto/trace/trace_packet.pbzero.h"
namespace perfetto {
-namespace {
class StartupTraceWriterTest : public AlignedBufferTest {
public:
@@ -46,16 +50,29 @@
task_runner_.reset();
}
+ std::unique_ptr<StartupTraceWriter> CreateUnboundWriter() {
+ std::shared_ptr<StartupTraceWriterRegistryHandle> registry;
+ return std::unique_ptr<StartupTraceWriter>(
+ new StartupTraceWriter(registry));
+ }
+
+ bool BindWriter(StartupTraceWriter* writer) {
+ const BufferID kBufId = 42;
+ return writer->BindToArbiter(arbiter_.get(), kBufId);
+ }
+
void WritePackets(StartupTraceWriter* writer, size_t packet_count) {
for (size_t i = 0; i < packet_count; i++) {
auto packet = writer->NewTracePacket();
- packet->set_for_testing()->set_str("foo");
+ packet->set_for_testing()->set_str(kPacketPayload);
}
}
- void VerifyPacketCount(size_t expected_count) {
+ void VerifyPackets(size_t expected_count) {
SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
- size_t packets_count = 0;
+ auto buffer = TraceBuffer::Create(abi->size());
+
+ size_t total_packets_count = 0;
ChunkID current_max_chunk_id = 0;
for (size_t page_idx = 0; page_idx < kNumPages; page_idx++) {
uint32_t page_layout = abi->GetPageLayout(page_idx);
@@ -77,19 +94,74 @@
current_max_chunk_id = std::max(current_max_chunk_id, chunk_id);
auto packets_header = chunk.header()->packets.load();
- packets_count += packets_header.count;
+ total_packets_count += packets_header.count;
if (packets_header.flags &
SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk) {
// Don't count fragmented packets twice.
- packets_count--;
+ total_packets_count--;
}
+
+ buffer->CopyChunkUntrusted(
+ /*producer_id_trusted=*/1, /*producer_uid_trusted=*/1,
+ chunk.header()->writer_id.load(), chunk_id, packets_header.count,
+ packets_header.flags, /*chunk_complete=*/true,
+ chunk.payload_begin(), chunk.payload_size());
abi->ReleaseChunkAsFree(std::move(chunk));
}
}
last_read_max_chunk_id_ = current_max_chunk_id;
- EXPECT_EQ(expected_count, packets_count);
+ EXPECT_EQ(expected_count, total_packets_count);
+
+ // Now verify chunk and packet contents.
+ buffer->BeginRead();
+ size_t num_packets_read = 0;
+ while (true) {
+ TracePacket packet;
+ uid_t producer_uid = kInvalidUid;
+ if (!buffer->ReadNextTracePacket(&packet, &producer_uid))
+ break;
+ EXPECT_EQ(static_cast<uid_t>(1), producer_uid);
+
+ SlicedProtobufInputStream stream(&packet.slices());
+ size_t size = 0;
+ for (const Slice& slice : packet.slices())
+ size += slice.size;
+ protos::TracePacket parsed_packet;
+ bool success = parsed_packet.ParseFromBoundedZeroCopyStream(
+ &stream, static_cast<int>(size));
+ EXPECT_TRUE(success);
+ if (!success)
+ break;
+ EXPECT_TRUE(parsed_packet.has_for_testing());
+ EXPECT_EQ(kPacketPayload, parsed_packet.for_testing().str());
+ num_packets_read++;
+ }
+ EXPECT_EQ(expected_count, num_packets_read);
}
+ size_t GetUnboundWriterCount(
+ const StartupTraceWriterRegistry& registry) const {
+ return registry.unbound_writers_.size() +
+ registry.unbound_owned_writers_.size();
+ }
+
+ size_t GetBindingRegistriesCount(
+ const SharedMemoryArbiterImpl& arbiter) const {
+ return arbiter.startup_trace_writer_registries_.size();
+ }
+
+ size_t GetUnboundWriterCount(const SharedMemoryArbiterImpl& arbiter) const {
+ size_t count = 0u;
+ for (const auto& reg : arbiter.startup_trace_writer_registries_) {
+ count += reg->unbound_writers_.size();
+ count += reg->unbound_owned_writers_.size();
+ }
+ return count;
+ }
+
+ protected:
+ static constexpr char kPacketPayload[] = "foo";
+
FakeProducerEndpoint fake_producer_endpoint_;
std::unique_ptr<base::TestTaskRunner> task_runner_;
std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
@@ -98,25 +170,27 @@
ChunkID last_read_max_chunk_id_ = 0;
};
+constexpr char StartupTraceWriterTest::kPacketPayload[];
+
+namespace {
+
size_t const kPageSizes[] = {4096, 65536};
INSTANTIATE_TEST_CASE_P(PageSize,
StartupTraceWriterTest,
::testing::ValuesIn(kPageSizes));
TEST_P(StartupTraceWriterTest, CreateUnboundAndBind) {
- // Create an unbound writer.
- std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+ auto writer = CreateUnboundWriter();
- // Bind it right away without having written any data before.
- const BufferID kBufId = 42;
- EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ // Bind writer right away without having written any data before.
+ EXPECT_TRUE(BindWriter(writer.get()));
const size_t kNumPackets = 32;
WritePackets(writer.get(), kNumPackets);
// Finalizes the last packet and returns the chunk.
writer.reset();
- VerifyPacketCount(kNumPackets);
+ VerifyPackets(kNumPackets);
}
TEST_P(StartupTraceWriterTest, CreateBound) {
@@ -130,12 +204,11 @@
// Finalizes the last packet and returns the chunk.
writer.reset();
- VerifyPacketCount(kNumPackets);
+ VerifyPackets(kNumPackets);
}
TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndDiscard) {
- // Create an unbound writer.
- std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+ auto writer = CreateUnboundWriter();
const size_t kNumPackets = 32;
WritePackets(writer.get(), kNumPackets);
@@ -143,22 +216,20 @@
// Should discard the written data.
writer.reset();
- VerifyPacketCount(0);
+ VerifyPackets(0);
}
TEST_P(StartupTraceWriterTest, WriteWhileUnboundAndBind) {
- // Create an unbound writer.
- std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+ auto writer = CreateUnboundWriter();
const size_t kNumPackets = 32;
WritePackets(writer.get(), kNumPackets);
// Binding the writer should cause the previously written packets to be
// written to the SMB and committed.
- const BufferID kBufId = 42;
- EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ EXPECT_TRUE(BindWriter(writer.get()));
- VerifyPacketCount(kNumPackets);
+ VerifyPackets(kNumPackets);
// Any further packets should be written to the SMB directly.
const size_t kNumAdditionalPackets = 16;
@@ -166,12 +237,11 @@
// Finalizes the last packet and returns the chunk.
writer.reset();
- VerifyPacketCount(kNumAdditionalPackets);
+ VerifyPackets(kNumAdditionalPackets);
}
TEST_P(StartupTraceWriterTest, WriteMultipleChunksWhileUnboundAndBind) {
- // Create an unbound writer.
- std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+ auto writer = CreateUnboundWriter();
// Write a single packet to determine its size in the buffer.
WritePackets(writer.get(), 1);
@@ -183,10 +253,9 @@
// Binding the writer should cause the previously written packets to be
// written to the SMB and committed.
- const BufferID kBufId = 42;
- EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ EXPECT_TRUE(BindWriter(writer.get()));
- VerifyPacketCount(kNumPackets + 1);
+ VerifyPackets(kNumPackets + 1);
// Any further packets should be written to the SMB directly.
const size_t kNumAdditionalPackets = 16;
@@ -194,25 +263,62 @@
// Finalizes the last packet and returns the chunk.
writer.reset();
- VerifyPacketCount(kNumAdditionalPackets);
+ VerifyPackets(kNumAdditionalPackets);
}
TEST_P(StartupTraceWriterTest, BindingWhileWritingFails) {
- // Create an unbound writer.
- std::unique_ptr<StartupTraceWriter> writer(new StartupTraceWriter());
+ auto writer = CreateUnboundWriter();
- const BufferID kBufId = 42;
{
- // Begin a write by opening a TracePacket
+ // Begin a write by opening a TracePacket.
auto packet = writer->NewTracePacket();
+ packet->set_for_testing()->set_str(kPacketPayload);
// Binding while writing should fail.
- EXPECT_FALSE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
+ EXPECT_FALSE(BindWriter(writer.get()));
}
// Packet was completed, so binding should work now and emit the packet.
- EXPECT_TRUE(arbiter_->BindStartupTraceWriter(writer.get(), kBufId));
- VerifyPacketCount(1);
+ EXPECT_TRUE(BindWriter(writer.get()));
+ VerifyPackets(1);
+}
+
+TEST_P(StartupTraceWriterTest, CreateAndBindViaRegistry) {
+ std::unique_ptr<StartupTraceWriterRegistry> registry(
+ new StartupTraceWriterRegistry());
+
+ // Create unbound writers.
+ auto writer1 = registry->CreateUnboundTraceWriter();
+ auto writer2 = registry->CreateUnboundTraceWriter();
+
+ EXPECT_EQ(2u, GetUnboundWriterCount(*registry));
+
+ // Return |writer2|. It should be kept alive until the registry is bound.
+ registry->ReturnUnboundTraceWriter(std::move(writer2));
+
+ {
+ // Begin a write by opening a TracePacket on |writer1|.
+ auto packet = writer1->NewTracePacket();
+
+ // Binding |writer1| writing should fail, but |writer2| should be bound.
+ const BufferID kBufId = 42;
+ arbiter_->BindStartupTraceWriterRegistry(std::move(registry), kBufId);
+ EXPECT_EQ(1u, GetUnboundWriterCount(*arbiter_));
+ }
+
+ // Wait for |writer1| to be bound and the registry to be deleted.
+ auto checkpoint_name = "all_bound";
+ auto all_bound = task_runner_->CreateCheckpoint(checkpoint_name);
+ std::function<void()> task;
+ task = [&task, &all_bound, this]() {
+ if (!GetBindingRegistriesCount(*arbiter_)) {
+ all_bound();
+ return;
+ }
+ task_runner_->PostDelayedTask(task, 1);
+ };
+ task_runner_->PostDelayedTask(task, 1);
+ task_runner_->RunUntilCheckpoint(checkpoint_name);
}
} // namespace