| /* |
| * Copyright (C) 2017 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/tracing/core/shared_memory_arbiter_impl.h" |
| |
| #include "perfetto/base/logging.h" |
| #include "perfetto/base/task_runner.h" |
| #include "perfetto/base/time.h" |
| #include "perfetto/ext/tracing/core/commit_data_request.h" |
| #include "perfetto/ext/tracing/core/shared_memory.h" |
| #include "perfetto/ext/tracing/core/startup_trace_writer_registry.h" |
| #include "src/tracing/core/null_trace_writer.h" |
| #include "src/tracing/core/trace_writer_impl.h" |
| |
| #include <limits> |
| #include <utility> |
| |
| namespace perfetto { |
| |
| using Chunk = SharedMemoryABI::Chunk; |
| |
| // static |
| SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout = |
| SharedMemoryABI::PageLayout::kPageDiv1; |
| |
| // static |
| std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance( |
| SharedMemory* shared_memory, |
| size_t page_size, |
| TracingService::ProducerEndpoint* producer_endpoint, |
| base::TaskRunner* task_runner) { |
| return std::unique_ptr<SharedMemoryArbiterImpl>( |
| new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(), |
| page_size, producer_endpoint, task_runner)); |
| } |
| |
| SharedMemoryArbiterImpl::SharedMemoryArbiterImpl( |
| void* start, |
| size_t size, |
| size_t page_size, |
| TracingService::ProducerEndpoint* producer_endpoint, |
| base::TaskRunner* task_runner) |
| : task_runner_(task_runner), |
| producer_endpoint_(producer_endpoint), |
| shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size), |
| active_writer_ids_(kMaxWriterID), |
| weak_ptr_factory_(this) {} |
| |
| Chunk SharedMemoryArbiterImpl::GetNewChunk( |
| const SharedMemoryABI::ChunkHeader& header, |
| BufferExhaustedPolicy buffer_exhausted_policy, |
| size_t size_hint) { |
| PERFETTO_DCHECK(size_hint == 0); // Not implemented yet. |
| int stall_count = 0; |
| unsigned stall_interval_us = 0; |
| static const unsigned kMaxStallIntervalUs = 100000; |
| static const int kLogAfterNStalls = 3; |
| static const int kFlushCommitsAfterEveryNStalls = 2; |
| static const int kAssertAtNStalls = 100; |
| |
| for (;;) { |
| // TODO(primiano): Probably this lock is not really required and this code |
| // could be rewritten leveraging only the Try* atomic operations in |
| // SharedMemoryABI. But let's not be too adventurous for the moment. |
| { |
| std::unique_lock<std::mutex> scoped_lock(lock_); |
| |
| // If more than half of the SMB.size() is filled with completed chunks for |
| // which we haven't notified the service yet (i.e. they are still enqueued |
| // in |commit_data_req_|), force a synchronous CommitDataRequest() even if |
| // we acquire a chunk, to reduce the likeliness of stalling the writer. |
| // |
| // We can only do this if we're writing on the same thread that we access |
| // the producer endpoint on, since we cannot notify the producer endpoint |
| // to commit synchronously on a different thread. Attempting to flush |
| // synchronously on another thread will lead to subtle bugs caused by |
| // out-of-order commit requests (crbug.com/919187#c28). |
| bool should_commit_synchronously = |
| buffer_exhausted_policy == BufferExhaustedPolicy::kStall && |
| commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2 && |
| task_runner_->RunsTasksOnCurrentThread(); |
| |
| const size_t initial_page_idx = page_idx_; |
| for (size_t i = 0; i < shmem_abi_.num_pages(); i++) { |
| page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages(); |
| bool is_new_page = false; |
| |
| // TODO(primiano): make the page layout dynamic. |
| auto layout = SharedMemoryArbiterImpl::default_page_layout; |
| |
| if (shmem_abi_.is_page_free(page_idx_)) { |
| // TODO(primiano): Use the |size_hint| here to decide the layout. |
| is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout); |
| } |
| uint32_t free_chunks; |
| if (is_new_page) { |
| free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1; |
| } else { |
| free_chunks = shmem_abi_.GetFreeChunks(page_idx_); |
| } |
| |
| for (uint32_t chunk_idx = 0; free_chunks; |
| chunk_idx++, free_chunks >>= 1) { |
| if (!(free_chunks & 1)) |
| continue; |
| // We found a free chunk. |
| Chunk chunk = shmem_abi_.TryAcquireChunkForWriting( |
| page_idx_, chunk_idx, &header); |
| if (!chunk.is_valid()) |
| continue; |
| if (stall_count > kLogAfterNStalls) { |
| PERFETTO_LOG("Recovered from stall after %d iterations", |
| stall_count); |
| } |
| |
| if (should_commit_synchronously) { |
| // We can't flush while holding the lock. |
| scoped_lock.unlock(); |
| FlushPendingCommitDataRequests(); |
| return chunk; |
| } else { |
| return chunk; |
| } |
| } |
| } |
| } // std::unique_lock<std::mutex> |
| |
| if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) { |
| PERFETTO_DLOG("Shared memory buffer exhaused, returning invalid Chunk!"); |
| return Chunk(); |
| } |
| |
| // All chunks are taken (either kBeingWritten by us or kBeingRead by the |
| // Service). |
| if (stall_count++ == kLogAfterNStalls) { |
| PERFETTO_LOG("Shared memory buffer overrun! Stalling"); |
| } |
| |
| if (stall_count == kAssertAtNStalls) { |
| PERFETTO_FATAL( |
| "Shared memory buffer max stall count exceeded; possible deadlock"); |
| } |
| |
| // If the IPC thread itself is stalled because the current process has |
| // filled up the SMB, we need to make sure that the service can process and |
| // purge the chunks written by our process, by flushing any pending commit |
| // requests. Because other threads in our process can continue to |
| // concurrently grab, fill and commit any chunks purged by the service, it |
| // is possible that the SMB remains full and the IPC thread remains stalled, |
| // needing to flush the concurrently queued up commits again. This is |
| // particularly likely with in-process perfetto service where the IPC thread |
| // is the service thread. To avoid remaining stalled forever in such a |
| // situation, we attempt to flush periodically after every N stalls. |
| if (stall_count % kFlushCommitsAfterEveryNStalls == 0 && |
| task_runner_->RunsTasksOnCurrentThread()) { |
| // TODO(primiano): sending the IPC synchronously is a temporary workaround |
| // until the backpressure logic in probes_producer is sorted out. Until |
| // then the risk is that we stall the message loop waiting for the tracing |
| // service to consume the shared memory buffer (SMB) and, for this reason, |
| // never run the task that tells the service to purge the SMB. This must |
| // happen iff we are on the IPC thread, not doing this will cause |
| // deadlocks, doing this on the wrong thread causes out-of-order data |
| // commits (crbug.com/919187#c28). |
| FlushPendingCommitDataRequests(); |
| } else { |
| base::SleepMicroseconds(stall_interval_us); |
| stall_interval_us = |
| std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8); |
| } |
| } |
| } |
| |
| void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk, |
| BufferID target_buffer, |
| PatchList* patch_list) { |
| PERFETTO_DCHECK(chunk.is_valid()); |
| const WriterID writer_id = chunk.writer_id(); |
| UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer, |
| patch_list); |
| } |
| |
| void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id, |
| BufferID target_buffer, |
| PatchList* patch_list) { |
| PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched()); |
| UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list); |
| } |
| |
| void SharedMemoryArbiterImpl::UpdateCommitDataRequest(Chunk chunk, |
| WriterID writer_id, |
| BufferID target_buffer, |
| PatchList* patch_list) { |
| // Note: chunk will be invalid if the call came from SendPatches(). |
| bool should_post_callback = false; |
| base::WeakPtr<SharedMemoryArbiterImpl> weak_this; |
| { |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| |
| if (!commit_data_req_) { |
| commit_data_req_.reset(new CommitDataRequest()); |
| weak_this = weak_ptr_factory_.GetWeakPtr(); |
| should_post_callback = true; |
| } |
| |
| // If a valid chunk is specified, return it and attach it to the request. |
| if (chunk.is_valid()) { |
| PERFETTO_DCHECK(chunk.writer_id() == writer_id); |
| uint8_t chunk_idx = chunk.chunk_idx(); |
| bytes_pending_commit_ += chunk.size(); |
| size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); |
| |
| // DO NOT access |chunk| after this point, has been std::move()-d above. |
| |
| CommitDataRequest::ChunksToMove* ctm = |
| commit_data_req_->add_chunks_to_move(); |
| ctm->set_page(static_cast<uint32_t>(page_idx)); |
| ctm->set_chunk(chunk_idx); |
| ctm->set_target_buffer(target_buffer); |
| } |
| |
| // Get the completed patches for previous chunks from the |patch_list| |
| // and attach them. |
| ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy. |
| CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr; |
| while (!patch_list->empty() && patch_list->front().is_patched()) { |
| if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) { |
| last_chunk_req = commit_data_req_->add_chunks_to_patch(); |
| last_chunk_req->set_writer_id(writer_id); |
| last_chunk_id = patch_list->front().chunk_id; |
| last_chunk_req->set_chunk_id(last_chunk_id); |
| last_chunk_req->set_target_buffer(target_buffer); |
| } |
| auto* patch_req = last_chunk_req->add_patches(); |
| patch_req->set_offset(patch_list->front().offset); |
| patch_req->set_data(&patch_list->front().size_field[0], |
| patch_list->front().size_field.size()); |
| patch_list->pop_front(); |
| } |
| // Patches are enqueued in the |patch_list| in order and are notified to |
| // the service when the chunk is returned. The only case when the current |
| // patch list is incomplete is if there is an unpatched entry at the head of |
| // the |patch_list| that belongs to the same ChunkID as the last one we are |
| // about to send to the service. |
| if (last_chunk_req && !patch_list->empty() && |
| patch_list->front().chunk_id == last_chunk_id) { |
| last_chunk_req->set_has_more_patches(true); |
| } |
| } // scoped_lock(lock_) |
| |
| if (should_post_callback) { |
| task_runner_->PostTask([weak_this] { |
| if (weak_this) |
| weak_this->FlushPendingCommitDataRequests(); |
| }); |
| } |
| } |
| |
| // This function is quite subtle. When making changes keep in mind these two |
| // challenges: |
| // 1) If the producer stalls and we happen to be on the |task_runner_| IPC |
| // thread (or, for in-process cases, on the same thread where |
| // TracingServiceImpl lives), the CommitData() call must be synchronous and |
| // not posted, to avoid deadlocks. |
| // 2) When different threads hit this function, we must guarantee that we don't |
| // accidentally make commits out of order. See commit 4e4fe8f56ef and |
| // crbug.com/919187 for more context. |
| void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests( |
| std::function<void()> callback) { |
| // May be called by TraceWriterImpl on any thread. |
| if (!task_runner_->RunsTasksOnCurrentThread()) { |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, callback] { |
| if (weak_this) |
| weak_this->FlushPendingCommitDataRequests(std::move(callback)); |
| }); |
| return; |
| } |
| |
| std::shared_ptr<CommitDataRequest> req; |
| { |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| req = std::move(commit_data_req_); |
| bytes_pending_commit_ = 0; |
| } |
| |
| // |req| could be a nullptr if |commit_data_req_| became a nullptr. For |
| // example when a forced sync flush happens in GetNewChunk(). |
| if (req) { |
| producer_endpoint_->CommitData(*req, callback); |
| } else if (callback) { |
| // If |req| was nullptr, it means that an enqueued deferred commit was |
| // executed just before this. At this point send an empty commit request |
| // to the service, just to linearize with it and give the guarantee to the |
| // caller that the data has been flushed into the service. |
| producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback)); |
| } |
| } |
| |
| std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter( |
| BufferID target_buffer, |
| BufferExhaustedPolicy buffer_exhausted_policy) { |
| WriterID id; |
| { |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| id = active_writer_ids_.Allocate(); |
| } |
| if (!id) |
| return std::unique_ptr<TraceWriter>(new NullTraceWriter()); |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, id, target_buffer] { |
| if (weak_this) |
| weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer); |
| }); |
| return std::unique_ptr<TraceWriter>( |
| new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy)); |
| } |
| |
| void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry( |
| std::unique_ptr<StartupTraceWriterRegistry> registry, |
| BufferID target_buffer) { |
| if (!task_runner_->RunsTasksOnCurrentThread()) { |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| auto* raw_reg = registry.release(); |
| task_runner_->PostTask([weak_this, raw_reg, target_buffer]() { |
| std::unique_ptr<StartupTraceWriterRegistry> owned_reg(raw_reg); |
| if (!weak_this) |
| return; |
| weak_this->BindStartupTraceWriterRegistry(std::move(owned_reg), |
| target_buffer); |
| }); |
| return; |
| } |
| |
| // The registry will be owned by the arbiter, so it's safe to capture |this| |
| // in the callback. |
| auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) { |
| std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete; |
| { |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| |
| for (auto it = startup_trace_writer_registries_.begin(); |
| it != startup_trace_writer_registries_.end(); it++) { |
| if (it->get() == bound_registry) { |
| // We can't delete the registry while the arbiter's lock is held |
| // (to avoid lock inversion). |
| registry_to_delete = std::move(*it); |
| startup_trace_writer_registries_.erase(it); |
| break; |
| } |
| } |
| } |
| |
| // The registry should have been in |startup_trace_writer_registries_|. |
| PERFETTO_DCHECK(registry_to_delete); |
| registry_to_delete.reset(); |
| }; |
| registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback); |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| startup_trace_writer_registries_.push_back(std::move(registry)); |
| } |
| |
| void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) { |
| bool should_post_commit_task = false; |
| { |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| // If a commit_data_req_ exists it means that somebody else already posted a |
| // FlushPendingCommitDataRequests() task. |
| if (!commit_data_req_) { |
| commit_data_req_.reset(new CommitDataRequest()); |
| should_post_commit_task = true; |
| } else { |
| // If there is another request queued and that also contains is a reply |
| // to a flush request, reply with the highest id. |
| req_id = std::max(req_id, commit_data_req_->flush_request_id()); |
| } |
| commit_data_req_->set_flush_request_id(req_id); |
| } |
| if (should_post_commit_task) { |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this] { |
| if (weak_this) |
| weak_this->FlushPendingCommitDataRequests(); |
| }); |
| } |
| } |
| |
| void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) { |
| auto weak_this = weak_ptr_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, id] { |
| if (weak_this) |
| weak_this->producer_endpoint_->UnregisterTraceWriter(id); |
| }); |
| |
| std::lock_guard<std::mutex> scoped_lock(lock_); |
| active_writer_ids_.Free(id); |
| } |
| |
| } // namespace perfetto |