Merge "Add Flush support to Producer and expose API to Consumer"
diff --git a/include/perfetto/protozero/message_handle.h b/include/perfetto/protozero/message_handle.h
index 02bbaf9..5fbe05b 100644
--- a/include/perfetto/protozero/message_handle.h
+++ b/include/perfetto/protozero/message_handle.h
@@ -44,21 +44,22 @@
// Move-only type.
MessageHandleBase(MessageHandleBase&&) noexcept;
MessageHandleBase& operator=(MessageHandleBase&&);
-
- protected:
- explicit MessageHandleBase(Message* = nullptr);
- Message& operator*() const {
+ explicit operator bool() const {
#if PERFETTO_DCHECK_IS_ON()
PERFETTO_DCHECK(!message_ || generation_ == message_->generation_);
#endif
- return *message_;
+ return !!message_;
}
+
+ protected:
+ explicit MessageHandleBase(Message* = nullptr);
Message* operator->() const {
#if PERFETTO_DCHECK_IS_ON()
PERFETTO_DCHECK(!message_ || generation_ == message_->generation_);
#endif
return message_;
}
+ Message& operator*() const { return *(operator->()); }
private:
friend class Message;
diff --git a/include/perfetto/tracing/core/basic_types.h b/include/perfetto/tracing/core/basic_types.h
index 112f58a..a1ca0b0 100644
--- a/include/perfetto/tracing/core/basic_types.h
+++ b/include/perfetto/tracing/core/basic_types.h
@@ -34,6 +34,9 @@
// Unique within the scope of a Producer.
using WriterID = uint16_t;
+// Unique within the scope of the tracing service.
+using FlushRequestID = uint64_t;
+
// We need one FD per producer and we are not going to be able to keep > 64k FDs
// open in the service.
static constexpr ProducerID kMaxProducerID = static_cast<ProducerID>(-1);
diff --git a/include/perfetto/tracing/core/commit_data_request.h b/include/perfetto/tracing/core/commit_data_request.h
index d2e30cd..3f0042f 100644
--- a/include/perfetto/tracing/core/commit_data_request.h
+++ b/include/perfetto/tracing/core/commit_data_request.h
@@ -191,9 +191,13 @@
return &chunks_to_patch_.back();
}
+ uint64_t flush_request_id() const { return flush_request_id_; }
+ void set_flush_request_id(uint64_t value) { flush_request_id_ = value; }
+
private:
std::vector<ChunksToMove> chunks_to_move_;
std::vector<ChunkToPatch> chunks_to_patch_;
+ uint64_t flush_request_id_ = {};
// Allows to preserve unknown protobuf fields for compatibility
// with future versions of .proto files.
diff --git a/include/perfetto/tracing/core/producer.h b/include/perfetto/tracing/core/producer.h
index 87bcb03..a8e1991 100644
--- a/include/perfetto/tracing/core/producer.h
+++ b/include/perfetto/tracing/core/producer.h
@@ -79,6 +79,17 @@
// Called by the Service after OnConnect but before the first DataSource is
// created. Can be used for any setup required before tracing begins.
virtual void OnTracingSetup() = 0;
+
+ // Called by the service to request the Producer to commit the data of the
+ // given data sources and return their chunks into the shared memory buffer.
+ // The Producer is expected to invoke NotifyFlushComplete(FlushRequestID) on
+ // the Service after the data has been committed. The producer has to either
+ // reply to the flush requests in order, or can just reply to the latest one
+ // Upon seeing a NotifyFlushComplete(N), the service will assume that all
+ // flushes < N have also been committed.
+ virtual void Flush(FlushRequestID,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) = 0;
};
} // namespace perfetto
diff --git a/include/perfetto/tracing/core/service.h b/include/perfetto/tracing/core/service.h
index 250308e..8513667 100644
--- a/include/perfetto/tracing/core/service.h
+++ b/include/perfetto/tracing/core/service.h
@@ -95,6 +95,10 @@
// DataSourceConfig.target_buffer().
virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) = 0;
+
+ // Called in response to a Producer::Flush(request_id) call after all data
+ // for the flush request has been committed.
+ virtual void NotifyFlushComplete(FlushRequestID) = 0;
}; // class ProducerEndpoint.
// The API for the Consumer port of the Service.
@@ -113,6 +117,13 @@
base::ScopedFile = base::ScopedFile()) = 0;
virtual void DisableTracing() = 0;
+ // Requests all data sources to flush their data immediately and invokes the
+ // passed callback once all of them have acked the flush (in which case
+ // the callback argument |success| will be true) or |timeout_ms| are elapsed
+ // (in which case |success| will be false).
+ using FlushCallback = std::function<void(bool /*success*/)>;
+ virtual void Flush(int timeout_ms, FlushCallback) = 0;
+
// Tracing data will be delivered invoking Consumer::OnTraceData().
virtual void ReadBuffers() = 0;
diff --git a/include/perfetto/tracing/core/shared_memory_arbiter.h b/include/perfetto/tracing/core/shared_memory_arbiter.h
index 2cd336c..27f7aec 100644
--- a/include/perfetto/tracing/core/shared_memory_arbiter.h
+++ b/include/perfetto/tracing/core/shared_memory_arbiter.h
@@ -50,6 +50,10 @@
virtual std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) = 0;
+ // Notifies the service that all data for the given FlushRequestID has been
+ // committed in the shared memory buffer.
+ virtual void NotifyFlushComplete(FlushRequestID) = 0;
+
// Implemented in src/core/shared_memory_arbiter_impl.cc .
static std::unique_ptr<SharedMemoryArbiter> CreateInstance(
SharedMemory*,
diff --git a/protos/perfetto/common/commit_data_request.proto b/protos/perfetto/common/commit_data_request.proto
index 4630528..59e29fb 100644
--- a/protos/perfetto/common/commit_data_request.proto
+++ b/protos/perfetto/common/commit_data_request.proto
@@ -70,4 +70,9 @@
optional bool has_more_patches = 5;
}
repeated ChunkToPatch chunks_to_patch = 2;
+
+ // Optional. If this commit is made in response to a Flush(id) request coming
+ // from the service, copy back the id of the request so the service can tell
+ // when the flush happened.
+ optional uint64 flush_request_id = 3;
}
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index abd3961..8c3e9b8 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -49,6 +49,15 @@
// implicitly if the Consumer disconnects.
rpc FreeBuffers(FreeBuffersRequest) returns (FreeBuffersResponse) {}
+ // Asks the service to request to all data sources involved in the tracing
+ // session to commit their data into the trace buffer. The FlushResponse is
+ // sent only:
+ // - After the data has been committed (in which case FlushResponse succeeds)
+ // or
+ // - After FlushRequest.timeout_ms milliseconds (in which case the
+ // FlushResponse is rejected and fails).
+ rpc Flush(FlushRequest) returns (FlushResponse) {}
+
// TODO rpc ListDataSources(), for the UI.
}
@@ -102,3 +111,10 @@
}
message FreeBuffersResponse {}
+
+// Arguments for rpc Flush().
+message FlushRequest {
+ optional uint32 timeout_ms = 1;
+}
+
+message FlushResponse {}
diff --git a/protos/perfetto/ipc/producer_port.proto b/protos/perfetto/ipc/producer_port.proto
index 26a2670..1146128 100644
--- a/protos/perfetto/ipc/producer_port.proto
+++ b/protos/perfetto/ipc/producer_port.proto
@@ -113,10 +113,22 @@
// buffer.
message SetupTracing { optional uint64 shared_buffer_page_size_kb = 1; }
+ message Flush {
+ // The instance id (i.e. StartDataSource.new_instance_id) of the data
+ // sources to flush.
+ repeated uint64 data_source_ids = 1;
+
+ // A monotonic counter generated by the service. The producer is simply
+ // expected to copy this value back into the CommitDataRequest, so the
+ // service can tell when the data for this flush has been committed.
+ optional uint64 request_id = 2;
+ }
+
oneof cmd {
StartDataSource start_data_source = 1;
StopDataSource stop_data_source = 2;
SetupTracing setup_tracing = 3;
// id == 4 was teardown_tracing, never implemented.
+ Flush flush = 5;
}
}
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index fd9dc50..4f7e514 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -260,7 +260,7 @@
// Failsafe mechanism to avoid waiting indefinitely if the service hangs.
if (trace_config_->duration_ms()) {
task_runner_.PostDelayedTask(std::bind(&PerfettoCmd::OnTimeout, this),
- trace_config_->duration_ms() * 2);
+ trace_config_->duration_ms() + 10000);
}
}
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index fdc8785..0a15b88 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -36,7 +36,6 @@
constexpr uint64_t kScanIntervalMs = 10000; // 10s
constexpr uint64_t kScanDelayMs = 10000; // 10s
constexpr uint64_t kScanBatchSize = 15000;
-constexpr uint64_t kFlushBeforeEndMs = 500;
uint64_t OrDefault(uint64_t value, uint64_t def) {
if (value != 0)
@@ -127,24 +126,7 @@
static_file_map_(static_file_map),
cache_(cache),
writer_(std::move(writer)),
- weak_factory_(this) {
- if (kFlushBeforeEndMs < source_config_.trace_duration_ms()) {
- auto weak_this = GetWeakPtr();
- // Flush TracePacket of current scan shortly before we expect the trace
- // to end, to retain information from any scan that might be in
- // progress.
- task_runner_->PostDelayedTask(
- [weak_this] {
- if (!weak_this) {
- PERFETTO_DLOG("Giving up flush.");
- return;
- }
- PERFETTO_DLOG("Flushing.");
- weak_this->ResetTracePacket();
- },
- source_config_.trace_duration_ms() - kFlushBeforeEndMs);
- }
-}
+ weak_factory_(this) {}
void InodeFileDataSource::AddInodesFromStaticMap(
BlockDeviceID block_device_id,
@@ -192,6 +174,11 @@
PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
}
+void InodeFileDataSource::Flush() {
+ ResetTracePacket();
+ writer_->Flush();
+}
+
void InodeFileDataSource::OnInodes(
const std::vector<std::pair<Inode, BlockDeviceID>>& inodes) {
if (mount_points_.empty()) {
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index a2be1f8..6dfa415 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -75,6 +75,8 @@
void AddInodesFromLRUCache(BlockDeviceID block_device_id,
std::set<Inode>* inode_numbers);
+ void Flush();
+
virtual ~InodeFileDataSource() {}
virtual void FillInodeEntry(InodeFileMap* destination,
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index abdbf07..d37f41d 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -251,6 +251,30 @@
void ProbesProducer::OnTracingSetup() {}
+void ProbesProducer::Flush(FlushRequestID flush_request_id,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) {
+ for (size_t i = 0; i < num_data_sources; i++) {
+ DataSourceInstanceID ds_id = data_source_ids[i];
+ {
+ auto it = process_stats_sources_.find(ds_id);
+ if (it != process_stats_sources_.end())
+ it->second->Flush();
+ }
+ {
+ auto it = file_map_sources_.find(ds_id);
+ if (it != file_map_sources_.end())
+ it->second->Flush();
+ }
+ {
+ auto it = delegates_.find(ds_id);
+ if (it != delegates_.end())
+ it->second->Flush();
+ }
+ }
+ endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
void ProbesProducer::ConnectWithRetries(const char* socket_name,
base::TaskRunner* task_runner) {
PERFETTO_DCHECK(state_ == kNotStarted);
@@ -289,6 +313,15 @@
ProbesProducer::SinkDelegate::~SinkDelegate() = default;
+void ProbesProducer::SinkDelegate::Flush() {
+ // TODO(primiano): this still doesn't flush data from the kernel ftrace
+ // buffers (see b/73886018). We should do that and delay the
+ // NotifyFlushComplete() until the ftrace data has been drained from the
+ // kernel ftrace buffer and written in the SMB.
+ if (writer_ && (!trace_packet_ || trace_packet_->is_finalized()))
+ writer_->Flush();
+}
+
ProbesProducer::FtraceBundleHandle
ProbesProducer::SinkDelegate::GetBundleForCpu(size_t) {
trace_packet_ = writer_->NewTracePacket();
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 72fede0..186fc23 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -48,6 +48,9 @@
const DataSourceConfig&) override;
void TearDownDataSourceInstance(DataSourceInstanceID) override;
void OnTracingSetup() override;
+ void Flush(FlushRequestID,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) override;
// Our Impl
void ConnectWithRetries(const char* socket_name,
@@ -77,6 +80,8 @@
TracingSessionID session_id() const { return session_id_; }
+ void Flush();
+
// FtraceDelegateImpl
FtraceBundleHandle GetBundleForCpu(size_t cpu) override;
void OnBundleComplete(size_t cpu,
@@ -111,6 +116,7 @@
// Keep this after the TraceWriter because TracePackets must not outlive
// their originating writer.
TraceWriter::TracePacketHandle trace_packet_;
+
// Keep this last.
base::WeakPtrFactory<SinkDelegate> weak_factory_;
};
diff --git a/src/traced/probes/process_stats_data_source.cc b/src/traced/probes/process_stats_data_source.cc
index 7afe6ab..7c9aa9f 100644
--- a/src/traced/probes/process_stats_data_source.cc
+++ b/src/traced/probes/process_stats_data_source.cc
@@ -56,11 +56,6 @@
WriteProcess(pid, process_tree);
seen_pids->insert(pid);
});
-
- trace_packet->Finalize();
-
- // TODO(hjd): Remove this once the service flushes the producers on teardown.
- writer_->Flush();
}
void ProcessStatsDataSource::OnPids(const std::vector<int32_t>& pids) {
@@ -78,6 +73,10 @@
}
}
+void ProcessStatsDataSource::Flush() {
+ writer_->Flush();
+}
+
// static
void ProcessStatsDataSource::WriteProcess(int32_t pid,
protos::pbzero::ProcessTree* tree) {
diff --git a/src/traced/probes/process_stats_data_source.h b/src/traced/probes/process_stats_data_source.h
index 5054af1..7a64269 100644
--- a/src/traced/probes/process_stats_data_source.h
+++ b/src/traced/probes/process_stats_data_source.h
@@ -42,6 +42,7 @@
base::WeakPtr<ProcessStatsDataSource> GetWeakPtr() const;
void WriteAllProcesses();
void OnPids(const std::vector<int32_t>& pids);
+ void Flush();
private:
static void WriteProcess(int32_t pid, protos::pbzero::ProcessTree*);
diff --git a/src/tracing/core/commit_data_request.cc b/src/tracing/core/commit_data_request.cc
index 9468465..1ee0fbe 100644
--- a/src/tracing/core/commit_data_request.cc
+++ b/src/tracing/core/commit_data_request.cc
@@ -52,6 +52,11 @@
chunks_to_patch_.emplace_back();
chunks_to_patch_.back().FromProto(field);
}
+
+ static_assert(sizeof(flush_request_id_) == sizeof(proto.flush_request_id()),
+ "size mismatch");
+ flush_request_id_ =
+ static_cast<decltype(flush_request_id_)>(proto.flush_request_id());
unknown_fields_ = proto.unknown_fields();
}
@@ -68,6 +73,11 @@
auto* entry = proto->add_chunks_to_patch();
it.ToProto(entry);
}
+
+ static_assert(sizeof(flush_request_id_) == sizeof(proto->flush_request_id()),
+ "size mismatch");
+ proto->set_flush_request_id(
+ static_cast<decltype(proto->flush_request_id())>(flush_request_id_));
*(proto->mutable_unknown_fields()) = unknown_fields_;
}
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index da63a07..def0f55 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -55,6 +55,7 @@
constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
constexpr int kMinWriteIntoFilePeriodMs = 100;
constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
+constexpr int kFlushTimeoutMs = 1000;
constexpr uint64_t kMillisPerHour = 3600000;
@@ -329,7 +330,7 @@
task_runner_->PostDelayedTask(
[weak_this, tsid] {
if (weak_this)
- weak_this->DisableTracing(tsid);
+ weak_this->FlushAndDisableTracing(tsid);
},
cfg.duration_ms());
}
@@ -391,6 +392,101 @@
// needed to call ReadBuffers(). FreeBuffers() will erase() the session.
}
+void ServiceImpl::Flush(TracingSessionID tsid,
+ int timeout_ms,
+ ConsumerEndpoint::FlushCallback callback) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ TracingSession* tracing_session = GetTracingSession(tsid);
+ if (!tracing_session) {
+ PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
+ return;
+ }
+
+ if (tracing_session->pending_flushes.size() > 1000) {
+ PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
+ tracing_session->pending_flushes.size());
+ callback(false);
+ return;
+ }
+
+ FlushRequestID flush_request_id = ++last_flush_request_id_;
+ PendingFlush& pending_flush =
+ tracing_session->pending_flushes
+ .emplace_hint(tracing_session->pending_flushes.end(),
+ flush_request_id, PendingFlush(std::move(callback)))
+ ->second;
+
+ // Send a flush request to each producer involved in the tracing session. In
+ // order to issue a flush request we have to build a map of all data source
+ // instance ids enabled for each producer.
+ std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
+ for (const auto& data_source_inst : tracing_session->data_source_instances) {
+ const ProducerID producer_id = data_source_inst.first;
+ const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
+ flush_map[producer_id].push_back(ds_inst_id);
+ }
+
+ for (const auto& kv : flush_map) {
+ ProducerID producer_id = kv.first;
+ ProducerEndpointImpl* producer = GetProducer(producer_id);
+ const std::vector<DataSourceInstanceID>& data_sources = kv.second;
+ producer->Flush(flush_request_id, data_sources);
+ pending_flush.producers.insert(producer_id);
+ }
+
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ task_runner_->PostDelayedTask(
+ [weak_this, tsid, flush_request_id] {
+ if (weak_this)
+ weak_this->OnFlushTimeout(tsid, flush_request_id);
+ },
+ timeout_ms);
+}
+
+void ServiceImpl::NotifyFlushDoneForProducer(ProducerID producer_id,
+ FlushRequestID flush_request_id) {
+ for (auto& kv : tracing_sessions_) {
+ // Remove all pending flushes <= |flush_request_id| for |producer_id|.
+ auto& pending_flushes = kv.second.pending_flushes;
+ auto end_it = pending_flushes.upper_bound(flush_request_id);
+ for (auto it = pending_flushes.begin(); it != end_it;) {
+ PendingFlush& pending_flush = it->second;
+ pending_flush.producers.erase(producer_id);
+ if (pending_flush.producers.empty()) {
+ task_runner_->PostTask(
+ std::bind(std::move(pending_flush.callback), /*success=*/true));
+ it = pending_flushes.erase(it);
+ } else {
+ it++;
+ }
+ } // for (pending_flushes)
+ } // for (tracing_session)
+}
+
+void ServiceImpl::OnFlushTimeout(TracingSessionID tsid,
+ FlushRequestID flush_request_id) {
+ TracingSession* tracing_session = GetTracingSession(tsid);
+ if (!tracing_session)
+ return;
+ auto it = tracing_session->pending_flushes.find(flush_request_id);
+ if (it == tracing_session->pending_flushes.end())
+ return; // Nominal case: flush was completed and acked on time.
+ auto callback = std::move(it->second.callback);
+ tracing_session->pending_flushes.erase(it);
+ callback(/*success=*/false);
+}
+
+void ServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ Flush(tsid, kFlushTimeoutMs, [weak_this, tsid](bool success) {
+ PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
+ success, tsid);
+ if (weak_this)
+ weak_this->DisableTracing(tsid);
+ });
+}
+
// Note: when this is called to write into a file passed when starting tracing
// |consumer| will be == nullptr (as opposite to the case of a consumer asking
// to send the trace data back over IPC).
@@ -1036,6 +1132,16 @@
tracing_session_id_ = 0;
}
+void ServiceImpl::ConsumerEndpointImpl::Flush(int timeout_ms,
+ FlushCallback callback) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ if (!tracing_session_id_) {
+ PERFETTO_LOG("Consumer called Flush() but tracing was not active");
+ return;
+ }
+ service_->Flush(tracing_session_id_, timeout_ms, callback);
+}
+
base::WeakPtr<ServiceImpl::ConsumerEndpointImpl>
ServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
PERFETTO_DCHECK_THREAD(thread_checker_);
@@ -1130,6 +1236,10 @@
service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
+ if (req_untrusted.flush_request_id()) {
+ service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
+ }
+
// Keep this invocation last. ProducerIPCService::CommitData() relies on this
// callback being invoked within the same callstack and not posted. If this
// changes, the code there needs to be changed accordingly.
@@ -1160,6 +1270,7 @@
// TODO(primiano): When we'll support tearing down the SMB, at this point we
// should send the Producer a TearDownTracing if all its data sources have
// been disabled (see b/77532839 and aosp/655179 PS1).
+ PERFETTO_DCHECK_THREAD(thread_checker_);
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_this, ds_inst_id] {
if (weak_this)
@@ -1192,9 +1303,23 @@
});
}
+void ServiceImpl::ProducerEndpointImpl::Flush(
+ FlushRequestID flush_request_id,
+ const std::vector<DataSourceInstanceID>& data_sources) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
+ if (weak_this) {
+ weak_this->producer_->Flush(flush_request_id, data_sources.data(),
+ data_sources.size());
+ }
+ });
+}
+
void ServiceImpl::ProducerEndpointImpl::CreateDataSourceInstance(
DataSourceInstanceID ds_id,
const DataSourceConfig& config) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
auto weak_this = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_this, ds_id, config] {
if (weak_this)
@@ -1202,6 +1327,11 @@
});
}
+void ServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(FlushRequestID id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ return GetOrCreateShmemArbiter()->NotifyFlushComplete(id);
+}
+
////////////////////////////////////////////////////////////////////////////////
// ServiceImpl::TracingSession implementation
////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 8aa0981..65a9ca4 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -75,8 +75,10 @@
std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override;
void OnTracingSetup();
+ void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
void CreateDataSourceInstance(DataSourceInstanceID,
const DataSourceConfig&);
+ void NotifyFlushComplete(FlushRequestID) override;
void TearDownDataSource(DataSourceInstanceID);
SharedMemory* shared_memory() const override;
size_t shared_buffer_page_size_kb() const override;
@@ -119,6 +121,7 @@
void DisableTracing() override;
void ReadBuffers() override;
void FreeBuffers() override;
+ void Flush(int timeout_ms, FlushCallback) override;
private:
friend class ServiceImpl;
@@ -152,6 +155,7 @@
size_t size);
void ApplyChunkPatches(ProducerID,
const std::vector<CommitDataRequest::ChunkToPatch>&);
+ void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
// Called by ConsumerEndpointImpl.
void DisconnectConsumer(ConsumerEndpointImpl*);
@@ -159,6 +163,10 @@
const TraceConfig&,
base::ScopedFile);
void DisableTracing(TracingSessionID);
+ void Flush(TracingSessionID tsid,
+ int timeout_ms,
+ ConsumerEndpoint::FlushCallback);
+ void FlushAndDisableTracing(TracingSessionID);
void ReadBuffers(TracingSessionID, ConsumerEndpointImpl*);
void FreeBuffers(TracingSessionID);
@@ -190,6 +198,12 @@
std::string data_source_name;
};
+ struct PendingFlush {
+ std::set<ProducerID> producers;
+ ConsumerEndpoint::FlushCallback callback;
+ explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
+ };
+
// Holds the state of a tracing session. A tracing session is uniquely bound
// a specific Consumer. Each Consumer can own one or more sessions.
struct TracingSession {
@@ -214,6 +228,10 @@
// producers for this tracing session.
std::multimap<ProducerID, DataSourceInstance> data_source_instances;
+ // For each Flush(N) request, keeps track of the set of producers for which
+ // we are still awaiting a NotifyFlushComplete(N) ack.
+ std::map<FlushRequestID, PendingFlush> pending_flushes;
+
// Maps a per-trace-session buffer index into the corresponding global
// BufferID (shared namespace amongst all consumers). This vector has as
// many entries as |config.buffers_size()|.
@@ -258,7 +276,7 @@
void MaybeSnapshotClocks(TracingSession*, std::vector<TracePacket>*);
void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
-
+ void OnFlushTimeout(TracingSessionID, FlushRequestID);
TraceBuffer* GetBufferByID(BufferID);
base::TaskRunner* const task_runner_;
@@ -266,17 +284,14 @@
ProducerID last_producer_id_ = 0;
DataSourceInstanceID last_data_source_instance_id_ = 0;
TracingSessionID last_tracing_session_id_ = 0;
+ FlushRequestID last_flush_request_id_ = 0;
// Buffer IDs are global across all consumers (because a Producer can produce
// data for more than one trace session, hence more than one consumer).
IdAllocator<BufferID> buffer_ids_;
std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
-
- // TODO(primiano): There doesn't seem to be any good reason why |producers_|
- // is a map indexed by ID and not just a set<ProducerEndpointImpl*>.
std::map<ProducerID, ProducerEndpointImpl*> producers_;
-
std::set<ConsumerEndpointImpl*> consumers_;
std::map<TracingSessionID, TracingSession> tracing_sessions_;
std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index d166192..61c0aa6 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -84,6 +84,13 @@
return svc->GetProducer(producer_id)->uid_;
}
+ size_t GetNumPendingFlushes() {
+ ServiceImpl::TracingSession* tracing_session =
+ svc->GetTracingSession(svc->last_tracing_session_id_);
+ EXPECT_NE(nullptr, tracing_session);
+ return tracing_session->pending_flushes.size();
+ }
+
base::TestTaskRunner task_runner;
std::unique_ptr<ServiceImpl> svc;
};
@@ -384,4 +391,140 @@
ASSERT_THAT(actual_shm_sizes_kb, ElementsAreArray(kExpectedSizesKb));
}
+TEST_F(ServiceImplTest, ExplicitFlush) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+ producer->RegisterDataSource("data_source");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("data_source");
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+ producer->WaitForDataSourceStart("data_source");
+
+ std::unique_ptr<TraceWriter> writer =
+ producer->CreateTraceWriter("data_source");
+ {
+ auto tp = writer->NewTracePacket();
+ tp->set_for_testing()->set_str("payload");
+ }
+
+ auto flush_request = consumer->Flush();
+ producer->WaitForFlush(writer.get());
+ ASSERT_TRUE(flush_request.WaitForReply());
+
+ consumer->DisableTracing();
+ producer->WaitForDataSourceStop("data_source");
+ consumer->WaitForTracingDisabled();
+ EXPECT_THAT(
+ consumer->ReadBuffers(),
+ Contains(Property(&protos::TracePacket::for_testing,
+ Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
+TEST_F(ServiceImplTest, ImplicitFlushOnTimedTraces) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+ producer->RegisterDataSource("data_source");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("data_source");
+ trace_config.set_duration_ms(1);
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+ producer->WaitForDataSourceStart("data_source");
+
+ std::unique_ptr<TraceWriter> writer =
+ producer->CreateTraceWriter("data_source");
+ {
+ auto tp = writer->NewTracePacket();
+ tp->set_for_testing()->set_str("payload");
+ }
+
+ producer->WaitForFlush(writer.get());
+
+ producer->WaitForDataSourceStop("data_source");
+ consumer->WaitForTracingDisabled();
+
+ EXPECT_THAT(
+ consumer->ReadBuffers(),
+ Contains(Property(&protos::TracePacket::for_testing,
+ Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
+// Tests the monotonic semantic of flush request IDs, i.e., once a producer
+// acks flush request N, all flush requests <= N are considered successful and
+// acked to the consumer.
+TEST_F(ServiceImplTest, BatchFlushes) {
+ std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+ consumer->Connect(svc.get());
+
+ std::unique_ptr<MockProducer> producer = CreateMockProducer();
+ producer->Connect(svc.get(), "mock_producer");
+ producer->RegisterDataSource("data_source");
+
+ TraceConfig trace_config;
+ trace_config.add_buffers()->set_size_kb(128);
+ auto* ds_config = trace_config.add_data_sources()->mutable_config();
+ ds_config->set_name("data_source");
+
+ consumer->EnableTracing(trace_config);
+ producer->WaitForTracingSetup();
+ producer->WaitForDataSourceStart("data_source");
+
+ std::unique_ptr<TraceWriter> writer =
+ producer->CreateTraceWriter("data_source");
+ {
+ auto tp = writer->NewTracePacket();
+ tp->set_for_testing()->set_str("payload");
+ }
+
+ auto flush_req_1 = consumer->Flush();
+ auto flush_req_2 = consumer->Flush();
+ auto flush_req_3 = consumer->Flush();
+
+ // We'll deliberately let the 4th flush request timeout. Use a lower timeout
+ // to keep test time short.
+ auto flush_req_4 = consumer->Flush(/*timeout_ms=*/10);
+ ASSERT_EQ(4u, GetNumPendingFlushes());
+
+ // Make the producer reply only to the 3rd flush request.
+ testing::InSequence seq;
+ producer->WaitForFlush(nullptr); // Will NOT reply to flush id == 1.
+ producer->WaitForFlush(nullptr); // Will NOT reply to flush id == 2.
+ producer->WaitForFlush(writer.get()); // Will reply only to flush id == 3.
+ producer->WaitForFlush(nullptr); // Will NOT reply to flush id == 4.
+
+ // Even if the producer explicily replied only to flush ID == 3, all the
+ // previous flushed < 3 should be implicitly acked.
+ ASSERT_TRUE(flush_req_1.WaitForReply());
+ ASSERT_TRUE(flush_req_2.WaitForReply());
+ ASSERT_TRUE(flush_req_3.WaitForReply());
+
+ // At this point flush id == 4 should still be pending and should fail because
+ // of reaching its timeout.
+ ASSERT_EQ(1u, GetNumPendingFlushes());
+ ASSERT_FALSE(flush_req_4.WaitForReply());
+
+ consumer->DisableTracing();
+ producer->WaitForDataSourceStop("data_source");
+ consumer->WaitForTracingDisabled();
+ EXPECT_THAT(
+ consumer->ReadBuffers(),
+ Contains(Property(&protos::TracePacket::for_testing,
+ Property(&protos::TestEvent::str, Eq("payload")))));
+}
+
} // namespace perfetto
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index b6c6fd9..f0960f8 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -246,6 +246,31 @@
new TraceWriterImpl(this, id, target_buffer));
}
+void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
+ bool should_post_commit_task = false;
+ {
+ std::lock_guard<std::mutex> scoped_lock(lock_);
+ // If a commit_data_req_ exists it means that somebody else already posted a
+ // FlushPendingCommitDataRequests() task.
+ if (!commit_data_req_) {
+ commit_data_req_.reset(new CommitDataRequest());
+ should_post_commit_task = true;
+ } else {
+ // If there is another request queued and that also contains is a reply
+ // to a flush request, reply with the highest id.
+ req_id = std::max(req_id, commit_data_req_->flush_request_id());
+ }
+ commit_data_req_->set_flush_request_id(req_id);
+ }
+ if (should_post_commit_task) {
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ task_runner_->PostTask([weak_this] {
+ if (weak_this)
+ weak_this->FlushPendingCommitDataRequests();
+ });
+ }
+}
+
void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
std::lock_guard<std::mutex> scoped_lock(lock_);
active_writer_ids_.Free(id);
diff --git a/src/tracing/core/shared_memory_arbiter_impl.h b/src/tracing/core/shared_memory_arbiter_impl.h
index 691d6b8..beefef6 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.h
+++ b/src/tracing/core/shared_memory_arbiter_impl.h
@@ -96,6 +96,8 @@
std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer = 0) override;
+ void NotifyFlushComplete(FlushRequestID) override;
+
private:
friend class TraceWriterImpl;
diff --git a/src/tracing/core/shared_memory_arbiter_impl_unittest.cc b/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
index ed96226..a7414ec 100644
--- a/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl_unittest.cc
@@ -37,6 +37,7 @@
public:
void RegisterDataSource(const DataSourceDescriptor&) override {}
void UnregisterDataSource(const std::string&) override {}
+ void NotifyFlushComplete(FlushRequestID) override {}
SharedMemory* shared_memory() const override { return nullptr; }
size_t shared_buffer_page_size_kb() const override { return 0; }
std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index ad9156a..7a1a8eb 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -35,6 +35,7 @@
void RegisterDataSource(const DataSourceDescriptor&) override {}
void UnregisterDataSource(const std::string&) override {}
void CommitData(const CommitDataRequest&, CommitDataCallback) override {}
+ void NotifyFlushComplete(FlushRequestID) override {}
SharedMemory* shared_memory() const override { return nullptr; }
size_t shared_buffer_page_size_kb() const override { return 0; }
std::unique_ptr<TraceWriter> CreateTraceWriter(BufferID) override {
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index c3382b5..6da7b23 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -156,4 +156,20 @@
consumer_port_.FreeBuffers(req, std::move(async_response));
}
+void ConsumerIPCClientImpl::Flush(int timeout_ms, FlushCallback callback) {
+ if (!connected_) {
+ PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
+ return callback(/*success=*/false);
+ }
+
+ protos::FlushRequest req;
+ req.set_timeout_ms(timeout_ms);
+ ipc::Deferred<protos::FlushResponse> async_response;
+ async_response.Bind(
+ [callback](ipc::AsyncResult<protos::FlushResponse> response) {
+ callback(!!response);
+ });
+ consumer_port_.Flush(req, std::move(async_response));
+}
+
} // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index d94b658..8b771a0 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -63,6 +63,7 @@
void DisableTracing() override;
void ReadBuffers() override;
void FreeBuffers() override;
+ void Flush(int timeout_ms, FlushCallback) override;
// ipc::ServiceProxy::EventListener implementation.
// These methods are invoked by the IPC layer, which knows nothing about
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.cc b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
index f6484b9..e43da50 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.cc
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.cc
@@ -138,8 +138,22 @@
return;
}
+ if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
+ // This cast boilerplate is required only because protobuf uses its own
+ // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
+ // type (long vs long long) even though they have the same size.
+ const auto* data_source_ids = cmd.flush().data_source_ids().data();
+ static_assert(sizeof(data_source_ids[0]) == sizeof(FlushRequestID),
+ "data_source_ids should be 64-bit");
+ producer_->Flush(cmd.flush().request_id(),
+ reinterpret_cast<const FlushRequestID*>(data_source_ids),
+ cmd.flush().data_source_ids().size());
+ return;
+ }
+
PERFETTO_DLOG("Unknown async request %d received from tracing service",
cmd.cmd_case());
+ PERFETTO_DCHECK(false);
}
void ProducerIPCClientImpl::RegisterDataSource(
@@ -205,6 +219,10 @@
return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
}
+void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
+ return shared_memory_arbiter_->NotifyFlushComplete(req_id);
+}
+
SharedMemory* ProducerIPCClientImpl::shared_memory() const {
return shared_memory_.get();
}
diff --git a/src/tracing/ipc/producer/producer_ipc_client_impl.h b/src/tracing/ipc/producer/producer_ipc_client_impl.h
index 1f4ec68..01f35c1 100644
--- a/src/tracing/ipc/producer/producer_ipc_client_impl.h
+++ b/src/tracing/ipc/producer/producer_ipc_client_impl.h
@@ -65,6 +65,7 @@
void CommitData(const CommitDataRequest&, CommitDataCallback) override;
std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID target_buffer) override;
+ void NotifyFlushComplete(FlushRequestID) override;
SharedMemory* shared_memory() const override;
size_t shared_buffer_page_size_kb() const override;
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index 3c599f4..6a291b2 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -93,6 +93,33 @@
resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create());
}
+// Called by the IPC layer.
+void ConsumerIPCService::Flush(const protos::FlushRequest& req,
+ DeferredFlushResponse resp) {
+ auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
+ std::move(resp));
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ auto callback = [weak_this, it](bool success) {
+ if (weak_this)
+ weak_this->OnFlushCallback(success, std::move(it));
+ };
+ GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
+ std::move(callback));
+}
+
+// Called by the service in response to a service_endpoint->Flush() request.
+void ConsumerIPCService::OnFlushCallback(
+ bool success,
+ PendingFlushResponses::iterator pending_response_it) {
+ DeferredFlushResponse response(std::move(*pending_response_it));
+ pending_flush_responses_.erase(pending_response_it);
+ if (success) {
+ response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create());
+ } else {
+ response.Reject();
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
// RemoteConsumer methods
////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index d3c02f7..d2b22ce 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -17,6 +17,7 @@
#ifndef SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
#define SRC_TRACING_IPC_SERVICE_CONSUMER_IPC_SERVICE_H_
+#include <list>
#include <map>
#include <memory>
#include <string>
@@ -52,6 +53,7 @@
DeferredReadBuffersResponse) override;
void FreeBuffers(const protos::FreeBuffersRequest&,
DeferredFreeBuffersResponse) override;
+ void Flush(const protos::FlushRequest&, DeferredFlushResponse) override;
void OnClientDisconnected() override;
private:
@@ -84,6 +86,9 @@
DeferredEnableTracingResponse enable_tracing_response;
};
+ // This has to be a container that doesn't invalidate iterators.
+ using PendingFlushResponses = std::list<DeferredFlushResponse>;
+
ConsumerIPCService(const ConsumerIPCService&) = delete;
ConsumerIPCService& operator=(const ConsumerIPCService&) = delete;
@@ -91,13 +96,17 @@
// the current IPC request.
RemoteConsumer* GetConsumerForCurrentRequest();
+ void OnFlushCallback(bool success, PendingFlushResponses::iterator);
+
Service* const core_service_;
// Maps IPC clients to ConsumerEndpoint instances registered on the
// |core_service_| business logic.
std::map<ipc::ClientID, std::unique_ptr<RemoteConsumer>> consumers_;
- base::WeakPtrFactory<ConsumerIPCService> weak_ptr_factory_;
+ PendingFlushResponses pending_flush_responses_;
+
+ base::WeakPtrFactory<ConsumerIPCService> weak_ptr_factory_; // Keep last.
};
} // namespace perfetto
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index abb3506..79161ca 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -241,4 +241,22 @@
async_producer_commands.Resolve(std::move(cmd));
}
+void ProducerIPCService::RemoteProducer::Flush(
+ FlushRequestID flush_request_id,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) {
+ if (!async_producer_commands.IsBound()) {
+ PERFETTO_DLOG(
+ "The Service tried to request a flush but the remote Producer has not "
+ "yet initialized the connection");
+ return;
+ }
+ auto cmd = ipc::AsyncResult<protos::GetAsyncCommandResponse>::Create();
+ cmd.set_has_more(true);
+ for (size_t i = 0; i < num_data_sources; i++)
+ cmd->mutable_flush()->add_data_source_ids(data_source_ids[i]);
+ cmd->mutable_flush()->set_request_id(flush_request_id);
+ async_producer_commands.Resolve(std::move(cmd));
+}
+
} // namespace perfetto
diff --git a/src/tracing/ipc/service/producer_ipc_service.h b/src/tracing/ipc/service/producer_ipc_service.h
index f0dacb9..ab795a4 100644
--- a/src/tracing/ipc/service/producer_ipc_service.h
+++ b/src/tracing/ipc/service/producer_ipc_service.h
@@ -73,6 +73,9 @@
const DataSourceConfig&) override;
void TearDownDataSourceInstance(DataSourceInstanceID) override;
void OnTracingSetup() override;
+ void Flush(FlushRequestID,
+ const DataSourceInstanceID* data_source_ids,
+ size_t num_data_sources) override;
// The interface obtained from the core service business logic through
// Service::ConnectProducer(this). This allows to invoke methods for a
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 6ea267e..86b83f8 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -68,6 +68,26 @@
task_runner_->RunUntilCheckpoint(checkpoint_name);
}
+MockConsumer::FlushRequest MockConsumer::Flush(int timeout_ms) {
+ static int i = 0;
+ auto checkpoint_name = "on_consumer_flush_" + std::to_string(i++);
+ auto on_flush = task_runner_->CreateCheckpoint(checkpoint_name);
+ std::shared_ptr<bool> result(new bool());
+ service_endpoint_->Flush(timeout_ms, [result, on_flush](bool success) {
+ *result = success;
+ on_flush();
+ });
+
+ base::TestTaskRunner* task_runner = task_runner_;
+ auto wait_for_flush_completion = [result, task_runner,
+ checkpoint_name]() -> bool {
+ task_runner->RunUntilCheckpoint(checkpoint_name);
+ return *result;
+ };
+
+ return FlushRequest(wait_for_flush_completion);
+}
+
std::vector<protos::TracePacket> MockConsumer::ReadBuffers() {
std::vector<protos::TracePacket> decoded_packets;
static int i = 0;
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index f62dde2..bab7009 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -34,6 +34,15 @@
class MockConsumer : public Consumer {
public:
+ class FlushRequest {
+ public:
+ FlushRequest(std::function<bool(void)> wait_func) : wait_func_(wait_func) {}
+ bool WaitForReply() { return wait_func_(); }
+
+ private:
+ std::function<bool(void)> wait_func_;
+ };
+
explicit MockConsumer(base::TestTaskRunner*);
~MockConsumer() override;
@@ -42,6 +51,7 @@
void DisableTracing();
void FreeBuffers();
void WaitForTracingDisabled();
+ FlushRequest Flush(int timeout_ms = 10000);
std::vector<protos::TracePacket> ReadBuffers();
Service::ConsumerEndpoint* endpoint() { return service_endpoint_.get(); }
diff --git a/src/tracing/test/mock_producer.cc b/src/tracing/test/mock_producer.cc
index 0b40990..78bca68 100644
--- a/src/tracing/test/mock_producer.cc
+++ b/src/tracing/test/mock_producer.cc
@@ -110,4 +110,16 @@
return service_endpoint_->CreateTraceWriter(buf_id);
}
+void MockProducer::WaitForFlush(TraceWriter* writer_to_flush) {
+ auto& expected_call = EXPECT_CALL(*this, Flush(_, _, _));
+ if (!writer_to_flush)
+ return;
+ expected_call.WillOnce(
+ Invoke([this, writer_to_flush](FlushRequestID flush_req_id,
+ const DataSourceInstanceID*, size_t) {
+ writer_to_flush->Flush();
+ service_endpoint_->NotifyFlushComplete(flush_req_id);
+ }));
+}
+
} // namespace perfetto
diff --git a/src/tracing/test/mock_producer.h b/src/tracing/test/mock_producer.h
index 8af4c76..d734c7f 100644
--- a/src/tracing/test/mock_producer.h
+++ b/src/tracing/test/mock_producer.h
@@ -54,6 +54,10 @@
std::unique_ptr<TraceWriter> CreateTraceWriter(
const std::string& data_source_name);
+ // If |writer_to_flush| != nullptr does NOT reply to the flush request.
+ // If |writer_to_flush| == nullptr does NOT reply to the flush request.
+ void WaitForFlush(TraceWriter* writer_to_flush);
+
Service::ProducerEndpoint* endpoint() { return service_endpoint_.get(); }
// Producer implementation.
@@ -63,6 +67,8 @@
void(DataSourceInstanceID, const DataSourceConfig&));
MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
MOCK_METHOD0(OnTracingSetup, void());
+ MOCK_METHOD3(Flush,
+ void(FlushRequestID, const DataSourceInstanceID*, size_t));
private:
base::TestTaskRunner* const task_runner_;
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 0d19fdc..3760683 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -61,6 +61,8 @@
MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
MOCK_METHOD0(uid, uid_t());
MOCK_METHOD0(OnTracingSetup, void());
+ MOCK_METHOD3(Flush,
+ void(FlushRequestID, const DataSourceInstanceID*, size_t));
};
class MockConsumer : public Consumer {
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index 9ca5430..b509647 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -98,6 +98,7 @@
void TearDownDataSourceInstance(DataSourceInstanceID) override {}
void OnTracingSetup() override {}
+ void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override {}
private:
const std::string name_;
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 2069b5d..7f14805 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -120,4 +120,13 @@
void FakeProducer::OnTracingSetup() {}
+void FakeProducer::Flush(FlushRequestID flush_request_id,
+ const DataSourceInstanceID*,
+ size_t num_data_sources) {
+ PERFETTO_DCHECK(num_data_sources > 0);
+ if (trace_writer_)
+ trace_writer_->Flush();
+ endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
} // namespace perfetto
diff --git a/test/fake_producer.h b/test/fake_producer.h
index ecf9d89..418c01e 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -50,6 +50,7 @@
const DataSourceConfig& source_config) override;
void TearDownDataSourceInstance(DataSourceInstanceID) override;
void OnTracingSetup() override;
+ void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override;
private:
void Shutdown();