blob: ba162ab4c909653458aff3b3f0ff16badabaff3a [file] [log] [blame]
Sami Kyostila2c6c2f52017-11-21 16:08:16 +00001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Oystein Eftevaagdd727e42017-12-05 08:49:55 -080017#include "perfetto_base/unix_task_runner.h"
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000018
Oystein Eftevaagdd727e42017-12-05 08:49:55 -080019#include "perfetto_base/build_config.h"
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000020
21#include <fcntl.h>
Sami Kyostila78b72d32017-11-23 18:08:57 +000022#include <stdlib.h>
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000023#include <unistd.h>
24
25namespace perfetto {
26namespace base {
27
28UnixTaskRunner::UnixTaskRunner() {
29 // Create a self-pipe which is used to wake up the main thread from inside
30 // poll(2).
31 int pipe_fds[2];
32 PERFETTO_CHECK(pipe(pipe_fds) == 0);
33
34 // Make the pipe non-blocking so that we never block the waking thread (either
35 // the main thread or another one) when scheduling a wake-up.
36 for (auto fd : pipe_fds) {
37 int flags = fcntl(fd, F_GETFL, 0);
38 PERFETTO_CHECK(flags != -1);
39 PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
40 PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
41 }
42 control_read_.reset(pipe_fds[0]);
43 control_write_.reset(pipe_fds[1]);
44
45#if BUILDFLAG(OS_LINUX)
46 // We are never expecting to have more than a few bytes in the wake-up pipe.
47 // Reduce the buffer size on Linux. Note that this gets rounded up to the page
48 // size.
49 PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
50#endif
51
52 AddFileDescriptorWatch(control_read_.get(), [] {
53 // Not reached -- see PostFileDescriptorWatches().
54 PERFETTO_DCHECK(false);
55 });
56}
57
58UnixTaskRunner::~UnixTaskRunner() = default;
59
60UnixTaskRunner::TimePoint UnixTaskRunner::GetTime() const {
61 return std::chrono::steady_clock::now();
62}
63
64void UnixTaskRunner::WakeUp() {
65 const char dummy = 'P';
66 if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
67 PERFETTO_DPLOG("write()");
68}
69
70void UnixTaskRunner::Run() {
71 PERFETTO_DCHECK_THREAD(thread_checker_);
72 quit_ = false;
73 while (true) {
Sami Kyostilac25f8372017-11-22 11:15:41 +000074 int poll_timeout_ms;
75 {
76 std::lock_guard<std::mutex> lock(lock_);
77 if (quit_)
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000078 return;
Sami Kyostilac25f8372017-11-22 11:15:41 +000079 poll_timeout_ms = static_cast<int>(GetDelayToNextTaskLocked().count());
80 UpdateWatchTasksLocked();
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000081 }
Sami Kyostilac25f8372017-11-22 11:15:41 +000082 int ret = PERFETTO_EINTR(poll(
83 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
84 PERFETTO_CHECK(ret >= 0);
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000085
Sami Kyostilac25f8372017-11-22 11:15:41 +000086 // To avoid starvation we always interleave all types of tasks -- immediate,
87 // delayed and file descriptor watches.
88 PostFileDescriptorWatches();
89 RunImmediateAndDelayedTask();
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000090 }
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000091}
92
93void UnixTaskRunner::Quit() {
94 {
95 std::lock_guard<std::mutex> lock(lock_);
96 quit_ = true;
97 }
98 WakeUp();
99}
100
Sami Kyostilaeaed9562017-11-22 12:48:10 +0000101bool UnixTaskRunner::IsIdleForTesting() {
102 std::lock_guard<std::mutex> lock(lock_);
103 return immediate_tasks_.empty();
104}
105
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000106void UnixTaskRunner::UpdateWatchTasksLocked() {
107 PERFETTO_DCHECK_THREAD(thread_checker_);
108 if (!watch_tasks_changed_)
109 return;
110 watch_tasks_changed_ = false;
111 poll_fds_.clear();
Sami Kyostila78b72d32017-11-23 18:08:57 +0000112 for (auto& it : watch_tasks_) {
113 it.second.poll_fd_index = poll_fds_.size();
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000114 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
Sami Kyostila78b72d32017-11-23 18:08:57 +0000115 }
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000116}
117
118void UnixTaskRunner::RunImmediateAndDelayedTask() {
119 // TODO(skyostil): Add a separate work queue in case in case locking overhead
120 // becomes an issue.
121 std::function<void()> immediate_task;
122 std::function<void()> delayed_task;
123 auto now = GetTime();
124 {
125 std::lock_guard<std::mutex> lock(lock_);
126 if (!immediate_tasks_.empty()) {
127 immediate_task = std::move(immediate_tasks_.front());
128 immediate_tasks_.pop_front();
129 }
130 if (!delayed_tasks_.empty()) {
131 auto it = delayed_tasks_.begin();
132 if (now >= it->first) {
133 delayed_task = std::move(it->second);
134 delayed_tasks_.erase(it);
135 }
136 }
137 }
138 if (immediate_task)
139 immediate_task();
140 if (delayed_task)
141 delayed_task();
142}
143
144void UnixTaskRunner::PostFileDescriptorWatches() {
145 PERFETTO_DCHECK_THREAD(thread_checker_);
146 for (size_t i = 0; i < poll_fds_.size(); i++) {
147 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
148 continue;
149 poll_fds_[i].revents = 0;
150
151 // The wake-up event is handled inline to avoid an infinite recursion of
152 // posted tasks.
153 if (poll_fds_[i].fd == control_read_.get()) {
154 // Drain the byte(s) written to the wake-up pipe. We can potentially read
155 // more than one byte if several wake-ups have been scheduled.
156 char buffer[16];
157 if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
158 errno != EAGAIN) {
159 PERFETTO_DPLOG("read()");
160 }
161 continue;
162 }
163
164 // Binding to |this| is safe since we are the only object executing the
165 // task.
166 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
167 poll_fds_[i].fd));
Sami Kyostila78b72d32017-11-23 18:08:57 +0000168
169 // Make the fd negative while a posted task is pending. This makes poll(2)
170 // ignore the fd.
171 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
172 poll_fds_[i].fd = -poll_fds_[i].fd;
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000173 }
174}
175
176void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
177 std::function<void()> task;
178 {
179 std::lock_guard<std::mutex> lock(lock_);
180 auto it = watch_tasks_.find(fd);
181 if (it == watch_tasks_.end())
182 return;
Sami Kyostila78b72d32017-11-23 18:08:57 +0000183 // Make poll(2) pay attention to the fd again. Since another thread may have
184 // updated this watch we need to refresh the set first.
185 UpdateWatchTasksLocked();
186 size_t fd_index = it->second.poll_fd_index;
187 PERFETTO_DCHECK(fd_index < poll_fds_.size());
188 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
189 poll_fds_[fd_index].fd = fd;
190 task = it->second.callback;
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000191 }
192 task();
193}
194
195UnixTaskRunner::TimeDurationMs UnixTaskRunner::GetDelayToNextTaskLocked()
196 const {
197 PERFETTO_DCHECK_THREAD(thread_checker_);
198 if (!immediate_tasks_.empty())
199 return TimeDurationMs(0);
200 if (!delayed_tasks_.empty()) {
201 return std::max(TimeDurationMs(0),
202 std::chrono::duration_cast<TimeDurationMs>(
203 delayed_tasks_.begin()->first - GetTime()));
204 }
205 return TimeDurationMs(-1);
206}
207
208void UnixTaskRunner::PostTask(std::function<void()> task) {
209 bool was_empty;
210 {
211 std::lock_guard<std::mutex> lock(lock_);
212 was_empty = immediate_tasks_.empty();
213 immediate_tasks_.push_back(std::move(task));
214 }
215 if (was_empty)
216 WakeUp();
217}
218
219void UnixTaskRunner::PostDelayedTask(std::function<void()> task, int delay_ms) {
220 PERFETTO_DCHECK(delay_ms >= 0);
221 auto runtime = GetTime() + std::chrono::milliseconds(delay_ms);
222 {
223 std::lock_guard<std::mutex> lock(lock_);
224 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
225 }
226 WakeUp();
227}
228
229void UnixTaskRunner::AddFileDescriptorWatch(int fd,
230 std::function<void()> task) {
Sami Kyostilaeaed9562017-11-22 12:48:10 +0000231 PERFETTO_DCHECK(fd >= 0);
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000232 {
233 std::lock_guard<std::mutex> lock(lock_);
234 PERFETTO_DCHECK(!watch_tasks_.count(fd));
Sami Kyostila78b72d32017-11-23 18:08:57 +0000235 watch_tasks_[fd] = {std::move(task), SIZE_MAX};
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000236 watch_tasks_changed_ = true;
237 }
238 WakeUp();
239}
240
241void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
Sami Kyostilaeaed9562017-11-22 12:48:10 +0000242 PERFETTO_DCHECK(fd >= 0);
Sami Kyostila2c6c2f52017-11-21 16:08:16 +0000243 {
244 std::lock_guard<std::mutex> lock(lock_);
245 PERFETTO_DCHECK(watch_tasks_.count(fd));
246 watch_tasks_.erase(fd);
247 watch_tasks_changed_ = true;
248 }
249 // No need to schedule a wake-up for this.
250}
251
252} // namespace base
253} // namespace perfetto