blob: bbb65577fa54cd5e45124726c56f03e62b238205 [file] [log] [blame]
/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/tracing/internal/tracing_muxer_impl.h"
#include <algorithm>
#include <atomic>
#include <vector>
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/tracing/core/data_source_config.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/tracing/data_source.h"
#include "perfetto/tracing/internal/data_source_internal.h"
#include "perfetto/tracing/trace_writer_base.h"
#include "perfetto/tracing/tracing.h"
#include "perfetto/tracing/tracing_backend.h"
#include "src/tracing/internal/in_process_tracing_backend.h"
#include "src/tracing/internal/system_tracing_backend.h"
namespace perfetto {
namespace internal {
// ----- Begin of TracingMuxerImpl::ProducerImpl
TracingMuxerImpl::ProducerImpl::ProducerImpl(TracingMuxerImpl* muxer,
TracingBackendId backend_id)
: muxer_(muxer), backend_id_(backend_id) {}
TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
void TracingMuxerImpl::ProducerImpl::Initialize(
std::unique_ptr<ProducerEndpoint> endpoint) {
service_ = std::move(endpoint);
}
void TracingMuxerImpl::ProducerImpl::OnConnect() {
PERFETTO_DLOG("Producer connected");
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!connected_);
connected_ = true;
muxer_->UpdateDataSourcesOnAllBackends();
}
void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
connected_ = false;
PERFETTO_DFATAL("Producer::OnDisconnect not implemented"); // TODO.
}
void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
PERFETTO_DCHECK_THREAD(thread_checker_);
}
void TracingMuxerImpl::ProducerImpl::SetupDataSource(
DataSourceInstanceID id,
const DataSourceConfig& cfg) {
PERFETTO_DCHECK_THREAD(thread_checker_);
muxer_->SetupDataSource(backend_id_, id, cfg);
}
void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
const DataSourceConfig&) {
PERFETTO_DCHECK_THREAD(thread_checker_);
muxer_->StartDataSource(backend_id_, id);
service_->NotifyDataSourceStarted(id);
}
void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
muxer_->StopDataSource(backend_id_, id);
service_->NotifyDataSourceStopped(id);
}
void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
const DataSourceInstanceID*,
size_t) {
// Flush is not plumbed for now, we just ack straight away.
PERFETTO_DCHECK_THREAD(thread_checker_);
service_->NotifyFlushComplete(flush_id);
}
void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
const DataSourceInstanceID*,
size_t) {
PERFETTO_DCHECK_THREAD(thread_checker_);
}
// ----- End of TracingMuxerImpl::ProducerImpl methods.
// ----- Begin of TracingMuxerImpl::ConsumerImpl
TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
TracingBackendId backend_id,
TracingSessionGlobalID session_id)
: muxer_(muxer), backend_id_(backend_id), session_id_(session_id) {}
TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
void TracingMuxerImpl::ConsumerImpl::Initialize(
std::unique_ptr<ConsumerEndpoint> endpoint) {
PERFETTO_DCHECK_THREAD(thread_checker_);
service_ = std::move(endpoint);
}
void TracingMuxerImpl::ConsumerImpl::OnConnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!connected_);
connected_ = true;
// If the API client configured and started tracing before we connected,
// tell the backend about it now.
if (trace_config_) {
muxer_->SetupTracingSession(session_id_, trace_config_);
if (start_pending_)
muxer_->StartTracingSession(session_id_);
}
}
void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
// It shouldn't be necessary to call StopTracingSession. If we get this call
// it means that the service did shutdown before us, so there is no point
// trying it to ask it to stop the session. We should just remember to cleanup
// the consumer vector.
connected_ = false;
// TODO notify the client somehow.
}
void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (stop_complete_callback_)
muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
}
void TracingMuxerImpl::ConsumerImpl::OnTraceData(
std::vector<TracePacket> packets,
bool has_more) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!read_trace_callback_)
return;
size_t capacity = 0;
for (const auto& packet : packets) {
// 16 is an over-estimation of the proto preamble size
capacity += packet.size() + 16;
}
// The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
buf->reserve(capacity);
for (auto& packet : packets) {
char* start;
size_t size;
std::tie(start, size) = packet.GetProtoPreamble();
buf->insert(buf->end(), start, start + size);
for (auto& slice : packet.slices()) {
const auto* slice_data = reinterpret_cast<const char*>(slice.start);
buf->insert(buf->end(), slice_data, slice_data + slice.size);
}
}
auto callback = read_trace_callback_;
muxer_->task_runner_->PostTask([callback, buf, has_more] {
TracingSession::ReadTraceCallbackArgs callback_arg{};
callback_arg.data = &(*buf)[0];
callback_arg.size = buf->size();
callback_arg.has_more = has_more;
callback(callback_arg);
});
if (!has_more)
read_trace_callback_ = nullptr;
}
// The callbacks below are not used.
void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
void TracingMuxerImpl::ConsumerImpl::OnTraceStats(bool, const TraceStats&) {}
void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
const ObservableEvents&) {}
// ----- End of TracingMuxerImpl::ConsumerImpl
// ----- Begin of TracingMuxerImpl::TracingSessionImpl
// TracingSessionImpl is the RAII object returned to API clients when they
// invoke Tracing::CreateTracingSession. They use it for starting/stopping
// tracing.
TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
TracingMuxerImpl* muxer,
TracingSessionGlobalID session_id)
: muxer_(muxer), session_id_(session_id) {}
// Can be destroyed from any thread.
TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->DestroyTracingSession(session_id); });
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg) {
auto* muxer = muxer_;
auto session_id = session_id_;
std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
muxer->task_runner_->PostTask([muxer, session_id, trace_config] {
muxer->SetupTracingSession(session_id, trace_config);
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Start() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->StartTracingSession(session_id); });
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Stop() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->StopTracingSession(session_id); });
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
muxer->ReadTracingSessionData(session_id, std::move(cb));
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
std::function<void()> cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
auto* consumer = muxer->FindConsumer(session_id);
consumer->stop_complete_callback_ = cb;
});
}
// ----- End of TracingMuxerImpl::TracingSessionImpl
// static
TracingMuxer* TracingMuxer::instance_ = nullptr;
// This is called by perfetto::Tracing::Initialize().
// Can be called on any thread. Typically, but not necessarily, that will be
// the embedder's main thread.
TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
: TracingMuxer(args.platform ? args.platform
: Platform::GetDefaultPlatform()) {
PERFETTO_DETACH_FROM_THREAD(thread_checker_);
// Create the thread where muxer, producers and service will live.
task_runner_ = platform_->CreateTaskRunner({});
// Run the initializer on that thread.
task_runner_->PostTask([this, args] { Initialize(args); });
}
void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
auto add_backend = [this](TracingBackend* backend, BackendType type) {
TracingBackendId backend_id = backends_.size();
backends_.emplace_back();
RegisteredBackend& rb = backends_.back();
rb.backend = backend;
rb.id = backend_id;
rb.type = type;
rb.producer.reset(new ProducerImpl(this, backend_id));
TracingBackend::ConnectProducerArgs conn_args;
conn_args.producer = rb.producer.get();
conn_args.producer_name = platform_->GetCurrentProcessName();
conn_args.task_runner = task_runner_.get();
rb.producer->Initialize(rb.backend->ConnectProducer(conn_args));
};
if (args.backends & kSystemBackend)
add_backend(SystemTracingBackend::GetInstance(), kSystemBackend);
if (args.backends & kInProcessBackend)
add_backend(InProcessTracingBackend::GetInstance(), kInProcessBackend);
if (args.backends & kCustomBackend) {
PERFETTO_CHECK(args.custom_backend);
add_backend(args.custom_backend, kCustomBackend);
}
if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
PERFETTO_FATAL("Unsupported tracing backend type");
}
}
// Can be called from any thread.
bool TracingMuxerImpl::RegisterDataSource(
const DataSourceDescriptor& descriptor,
DataSourceFactory factory,
DataSourceStaticState* static_state) {
static std::atomic<uint32_t> last_id{};
uint32_t new_index = last_id++;
if (new_index >= kMaxDataSources - 1) {
PERFETTO_DLOG(
"RegisterDataSource failed: too many data sources already registered");
return false;
}
// Initialize the static state.
static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
"instances[] size mismatch");
for (size_t i = 0; i < static_state->instances.size(); i++)
new (&static_state->instances[i]) DataSourceState{};
static_state->index = new_index;
task_runner_->PostTask([this, descriptor, factory, static_state] {
data_sources_.emplace_back();
RegisteredDataSource& rds = data_sources_.back();
rds.descriptor = descriptor;
rds.factory = factory;
rds.static_state = static_state;
UpdateDataSourcesOnAllBackends();
});
return true;
}
// Called by the service of one of the backends.
void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
DataSourceInstanceID instance_id,
const DataSourceConfig& cfg) {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
cfg.name().c_str());
for (const auto& rds : data_sources_) {
if (rds.descriptor.name() != cfg.name())
continue;
DataSourceStaticState& static_state = *rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
// Find a free slot.
if (static_state.TryGet(i))
continue;
auto* internal_state =
reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
std::lock_guard<std::mutex> guard(internal_state->lock);
static_assert(
std::is_same<decltype(internal_state->data_source_instance_id),
DataSourceInstanceID>::value,
"data_source_instance_id type mismatch");
internal_state->backend_id = backend_id;
internal_state->data_source_instance_id = instance_id;
internal_state->buffer_id =
static_cast<internal::BufferId>(cfg.target_buffer());
internal_state->data_source = rds.factory();
// This must be made at the end. See matching acquire-load in
// DataSource::Trace().
static_state.valid_instances.fetch_or(1 << i, std::memory_order_acq_rel);
DataSourceBase::SetupArgs setup_args;
setup_args.config = &cfg;
internal_state->data_source->OnSetup(setup_args);
break;
}
}
}
// Called by the service of one of the backends.
void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
for (const auto& rds : data_sources_) {
DataSourceStaticState& static_state = *rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state.TryGet(i);
if (!internal_state)
continue;
if (internal_state->backend_id != backend_id ||
internal_state->data_source_instance_id != instance_id) {
continue;
}
std::lock_guard<std::mutex> guard(internal_state->lock);
internal_state->started = true;
internal_state->data_source->OnStart(DataSourceBase::StartArgs{});
return;
}
}
PERFETTO_ELOG("Could not find data source to start");
}
// Called by the service of one of the backends.
void TracingMuxerImpl::StopDataSource(TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
for (const auto& rds : data_sources_) {
DataSourceStaticState& static_state = *rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state.TryGet(i);
if (!internal_state)
continue;
if (internal_state->backend_id != backend_id ||
internal_state->data_source_instance_id != instance_id) {
continue;
}
static_state.valid_instances.fetch_and(~(1 << i),
std::memory_order_acq_rel);
// Take the mutex to prevent that the data source is in the middle of
// a Trace() execution where it called GetDataSourceLocked() while we
// destroy it.
{
std::lock_guard<std::mutex> guard(internal_state->lock);
internal_state->started = false;
internal_state->data_source->OnStop(DataSourceBase::StopArgs{});
internal_state->data_source.reset();
}
// The other fields of internal_state are deliberately *not* cleared.
// See races-related comments of DataSource::Trace().
TracingMuxer::generation_++;
return;
}
}
PERFETTO_ELOG("Could not find data source to stop");
}
void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
// Iterate across all possible data source types.
auto cur_generation = generation_.load(std::memory_order_acquire);
auto* root_tls = GetOrCreateTracingTLS();
for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
// |tls| has a vector of per-data-source-instance thread-local state.
DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
DataSourceStaticState* static_state = tls.static_state;
if (!static_state)
continue; // Slot not used.
// Iterate across all possible instances for this data source.
for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
if (!ds_tls.trace_writer)
continue;
DataSourceState* ds_state = static_state->TryGet(inst);
if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
ds_state->buffer_id == ds_tls.buffer_id) {
continue;
}
// The DataSource instance has been destroyed or recycled.
ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
}
}
root_tls->generation = cur_generation;
}
// Called both when a new data source is registered or when a new backend
// connects. In both cases we want to be sure we reflected the data source
// registrations on the backends.
void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredDataSource& rds : data_sources_) {
for (RegisteredBackend& backend : backends_) {
// We cannot call RegisterDataSource on the backend before it connects.
if (!backend.producer->connected_)
continue;
PERFETTO_DCHECK(rds.static_state->index < kMaxDataSourceInstances);
if (backend.producer->registered_data_sources_.test(
rds.static_state->index))
continue;
rds.descriptor.set_will_notify_on_start(true);
rds.descriptor.set_will_notify_on_stop(true);
backend.producer->service_->RegisterDataSource(rds.descriptor);
backend.producer->registered_data_sources_.set(rds.static_state->index);
}
}
}
void TracingMuxerImpl::SetupTracingSession(
TracingSessionGlobalID session_id,
const std::shared_ptr<TraceConfig>& trace_config) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
consumer->trace_config_ = trace_config;
if (!consumer->connected_)
return;
// Only used in the deferred start mode.
if (trace_config->deferred_start())
consumer->service_->EnableTracing(*trace_config);
}
void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
if (!consumer->trace_config_) {
PERFETTO_ELOG("Must call Setup(config) first");
return;
}
if (!consumer->connected_) {
consumer->start_pending_ = true;
return;
}
if (consumer->trace_config_->deferred_start()) {
consumer->service_->StartTracing();
} else {
consumer->service_->EnableTracing(*consumer->trace_config_);
}
// TODO implement support for the deferred-start + fast-triggering case.
}
void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
if (!consumer->trace_config_) {
PERFETTO_ELOG("Must call Setup(config) and Start() first");
return;
}
consumer->service_->DisableTracing();
consumer->trace_config_.reset();
}
void TracingMuxerImpl::DestroyTracingSession(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredBackend& backend : backends_) {
auto pred = [session_id](const std::unique_ptr<ConsumerImpl>& consumer) {
return consumer->session_id_ == session_id;
};
backend.consumers.erase(std::remove_if(backend.consumers.begin(),
backend.consumers.end(), pred),
backend.consumers.end());
}
}
void TracingMuxerImpl::ReadTracingSessionData(
TracingSessionGlobalID session_id,
std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
PERFETTO_DCHECK(!consumer->read_trace_callback_);
consumer->read_trace_callback_ = std::move(callback);
consumer->service_->ReadBuffers();
}
TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredBackend& backend : backends_) {
for (auto& consumer : backend.consumers) {
if (consumer->session_id_ == session_id)
return consumer.get();
}
}
return nullptr;
}
// Can be called from any thread.
std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
DataSourceState* data_source) {
ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
return producer->service_->CreateTraceWriter(data_source->buffer_id);
}
// This is called via the public API Tracing::NewTrace().
// Can be called from any thread.
std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
BackendType backend_type) {
TracingSessionGlobalID session_id = ++next_tracing_session_id_;
// |backend_type| can only specify one backend, not an OR-ed mask.
PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
// Capturing |this| is fine because the TracingMuxer is a leaky singleton.
task_runner_->PostTask([this, backend_type, session_id] {
for (RegisteredBackend& backend : backends_) {
if (backend_type && backend.type != backend_type)
continue;
backend.consumers.emplace_back(
new ConsumerImpl(this, backend.id, session_id));
auto& consumer = backend.consumers.back();
TracingBackend::ConnectConsumerArgs conn_args;
conn_args.consumer = consumer.get();
conn_args.task_runner = task_runner_.get();
consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
return;
}
PERFETTO_ELOG(
"Cannot create tracing session, no tracing backend ready for type=%d",
backend_type);
});
return std::unique_ptr<TracingSession>(
new TracingSessionImpl(this, session_id));
}
void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
if (instance_)
PERFETTO_FATAL("Tracing already initialized");
instance_ = new TracingMuxerImpl(args);
}
TracingMuxer::~TracingMuxer() = default;
static_assert(std::is_same<internal::BufferId, BufferID>::value,
"public's BufferId and tracing/core's BufferID diverged");
} // namespace internal
} // namespace perfetto