blob: 1431c519c79bf2a1a3b473081a1e0d171898814d [file] [log] [blame]
Sami Kyostila2c6c2f52017-11-21 16:08:16 +00001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "base/unix_task_runner.h"
18
19#include "base/build_config.h"
20
21#include <fcntl.h>
22#include <unistd.h>
23
24namespace perfetto {
25namespace base {
26
27UnixTaskRunner::UnixTaskRunner() {
28 // Create a self-pipe which is used to wake up the main thread from inside
29 // poll(2).
30 int pipe_fds[2];
31 PERFETTO_CHECK(pipe(pipe_fds) == 0);
32
33 // Make the pipe non-blocking so that we never block the waking thread (either
34 // the main thread or another one) when scheduling a wake-up.
35 for (auto fd : pipe_fds) {
36 int flags = fcntl(fd, F_GETFL, 0);
37 PERFETTO_CHECK(flags != -1);
38 PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
39 PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
40 }
41 control_read_.reset(pipe_fds[0]);
42 control_write_.reset(pipe_fds[1]);
43
44#if BUILDFLAG(OS_LINUX)
45 // We are never expecting to have more than a few bytes in the wake-up pipe.
46 // Reduce the buffer size on Linux. Note that this gets rounded up to the page
47 // size.
48 PERFETTO_CHECK(fcntl(control_read_.get(), F_SETPIPE_SZ, 1) > 0);
49#endif
50
51 AddFileDescriptorWatch(control_read_.get(), [] {
52 // Not reached -- see PostFileDescriptorWatches().
53 PERFETTO_DCHECK(false);
54 });
55}
56
57UnixTaskRunner::~UnixTaskRunner() = default;
58
59UnixTaskRunner::TimePoint UnixTaskRunner::GetTime() const {
60 return std::chrono::steady_clock::now();
61}
62
63void UnixTaskRunner::WakeUp() {
64 const char dummy = 'P';
65 if (write(control_write_.get(), &dummy, 1) <= 0 && errno != EAGAIN)
66 PERFETTO_DPLOG("write()");
67}
68
69void UnixTaskRunner::Run() {
70 PERFETTO_DCHECK_THREAD(thread_checker_);
71 quit_ = false;
72 while (true) {
Sami Kyostilac25f8372017-11-22 11:15:41 +000073 int poll_timeout_ms;
74 {
75 std::lock_guard<std::mutex> lock(lock_);
76 if (quit_)
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000077 return;
Sami Kyostilac25f8372017-11-22 11:15:41 +000078 poll_timeout_ms = static_cast<int>(GetDelayToNextTaskLocked().count());
79 UpdateWatchTasksLocked();
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000080 }
Sami Kyostilac25f8372017-11-22 11:15:41 +000081 int ret = PERFETTO_EINTR(poll(
82 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
83 PERFETTO_CHECK(ret >= 0);
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000084
Sami Kyostilac25f8372017-11-22 11:15:41 +000085 // To avoid starvation we always interleave all types of tasks -- immediate,
86 // delayed and file descriptor watches.
87 PostFileDescriptorWatches();
88 RunImmediateAndDelayedTask();
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000089 }
Sami Kyostila2c6c2f52017-11-21 16:08:16 +000090}
91
92void UnixTaskRunner::Quit() {
93 {
94 std::lock_guard<std::mutex> lock(lock_);
95 quit_ = true;
96 }
97 WakeUp();
98}
99
100void UnixTaskRunner::UpdateWatchTasksLocked() {
101 PERFETTO_DCHECK_THREAD(thread_checker_);
102 if (!watch_tasks_changed_)
103 return;
104 watch_tasks_changed_ = false;
105 poll_fds_.clear();
106 for (const auto& it : watch_tasks_)
107 poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
108}
109
110void UnixTaskRunner::RunImmediateAndDelayedTask() {
111 // TODO(skyostil): Add a separate work queue in case in case locking overhead
112 // becomes an issue.
113 std::function<void()> immediate_task;
114 std::function<void()> delayed_task;
115 auto now = GetTime();
116 {
117 std::lock_guard<std::mutex> lock(lock_);
118 if (!immediate_tasks_.empty()) {
119 immediate_task = std::move(immediate_tasks_.front());
120 immediate_tasks_.pop_front();
121 }
122 if (!delayed_tasks_.empty()) {
123 auto it = delayed_tasks_.begin();
124 if (now >= it->first) {
125 delayed_task = std::move(it->second);
126 delayed_tasks_.erase(it);
127 }
128 }
129 }
130 if (immediate_task)
131 immediate_task();
132 if (delayed_task)
133 delayed_task();
134}
135
136void UnixTaskRunner::PostFileDescriptorWatches() {
137 PERFETTO_DCHECK_THREAD(thread_checker_);
138 for (size_t i = 0; i < poll_fds_.size(); i++) {
139 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
140 continue;
141 poll_fds_[i].revents = 0;
142
143 // The wake-up event is handled inline to avoid an infinite recursion of
144 // posted tasks.
145 if (poll_fds_[i].fd == control_read_.get()) {
146 // Drain the byte(s) written to the wake-up pipe. We can potentially read
147 // more than one byte if several wake-ups have been scheduled.
148 char buffer[16];
149 if (read(control_read_.get(), &buffer[0], sizeof(buffer)) <= 0 &&
150 errno != EAGAIN) {
151 PERFETTO_DPLOG("read()");
152 }
153 continue;
154 }
155
156 // Binding to |this| is safe since we are the only object executing the
157 // task.
158 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
159 poll_fds_[i].fd));
160 }
161}
162
163void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
164 std::function<void()> task;
165 {
166 std::lock_guard<std::mutex> lock(lock_);
167 auto it = watch_tasks_.find(fd);
168 if (it == watch_tasks_.end())
169 return;
170 task = it->second;
171 }
172 task();
173}
174
175UnixTaskRunner::TimeDurationMs UnixTaskRunner::GetDelayToNextTaskLocked()
176 const {
177 PERFETTO_DCHECK_THREAD(thread_checker_);
178 if (!immediate_tasks_.empty())
179 return TimeDurationMs(0);
180 if (!delayed_tasks_.empty()) {
181 return std::max(TimeDurationMs(0),
182 std::chrono::duration_cast<TimeDurationMs>(
183 delayed_tasks_.begin()->first - GetTime()));
184 }
185 return TimeDurationMs(-1);
186}
187
188void UnixTaskRunner::PostTask(std::function<void()> task) {
189 bool was_empty;
190 {
191 std::lock_guard<std::mutex> lock(lock_);
192 was_empty = immediate_tasks_.empty();
193 immediate_tasks_.push_back(std::move(task));
194 }
195 if (was_empty)
196 WakeUp();
197}
198
199void UnixTaskRunner::PostDelayedTask(std::function<void()> task, int delay_ms) {
200 PERFETTO_DCHECK(delay_ms >= 0);
201 auto runtime = GetTime() + std::chrono::milliseconds(delay_ms);
202 {
203 std::lock_guard<std::mutex> lock(lock_);
204 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
205 }
206 WakeUp();
207}
208
209void UnixTaskRunner::AddFileDescriptorWatch(int fd,
210 std::function<void()> task) {
211 {
212 std::lock_guard<std::mutex> lock(lock_);
213 PERFETTO_DCHECK(!watch_tasks_.count(fd));
214 watch_tasks_[fd] = std::move(task);
215 watch_tasks_changed_ = true;
216 }
217 WakeUp();
218}
219
220void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
221 {
222 std::lock_guard<std::mutex> lock(lock_);
223 PERFETTO_DCHECK(watch_tasks_.count(fd));
224 watch_tasks_.erase(fd);
225 watch_tasks_changed_ = true;
226 }
227 // No need to schedule a wake-up for this.
228}
229
230} // namespace base
231} // namespace perfetto