Extract base::Pipe

The usage of pipe() in the codebase hit the
three-strikes-and-you-refactor rule. Also, most call sites were
forgetting to set the FD_CLOEXEC flag.

Test: already covered by other tests that use pipe
Change-Id: Ia5203dafe446e6555aa7b738ccc91d67ef104bd3
diff --git a/Android.bp b/Android.bp
index f6762ed..0946c40 100644
--- a/Android.bp
+++ b/Android.bp
@@ -37,6 +37,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -137,6 +138,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -191,6 +193,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -322,6 +325,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -469,6 +473,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -4286,6 +4291,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
@@ -4486,6 +4492,7 @@
     "src/base/optional_unittest.cc",
     "src/base/paged_memory.cc",
     "src/base/paged_memory_unittest.cc",
+    "src/base/pipe.cc",
     "src/base/scoped_file_unittest.cc",
     "src/base/string_splitter.cc",
     "src/base/string_splitter_unittest.cc",
@@ -4725,6 +4732,7 @@
     "src/base/file_utils.cc",
     "src/base/metatrace.cc",
     "src/base/paged_memory.cc",
+    "src/base/pipe.cc",
     "src/base/string_splitter.cc",
     "src/base/string_utils.cc",
     "src/base/temp_file.cc",
diff --git a/include/perfetto/base/BUILD.gn b/include/perfetto/base/BUILD.gn
index 05baf78..16c56ff 100644
--- a/include/perfetto/base/BUILD.gn
+++ b/include/perfetto/base/BUILD.gn
@@ -26,6 +26,7 @@
     "metatrace.h",
     "optional.h",
     "paged_memory.h",
+    "pipe.h",
     "scoped_file.h",
     "small_set.h",
     "string_splitter.h",
diff --git a/include/perfetto/base/pipe.h b/include/perfetto/base/pipe.h
new file mode 100644
index 0000000..a1fde3c
--- /dev/null
+++ b/include/perfetto/base/pipe.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_BASE_PIPE_H_
+#define INCLUDE_PERFETTO_BASE_PIPE_H_
+
+#include "perfetto/base/scoped_file.h"
+
+namespace perfetto {
+namespace base {
+
+class Pipe {
+ public:
+  enum Flags {
+    kBothBlock = 0,
+    kBothNonBlock,
+    kRdNonBlock,
+    kWrNonBlock,
+  };
+
+  static Pipe Create(Flags = kBothBlock);
+
+  Pipe();
+  Pipe(Pipe&&) noexcept;
+  Pipe& operator=(Pipe&&);
+
+  ScopedFile rd;
+  ScopedFile wr;
+};
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_BASE_PIPE_H_
diff --git a/src/base/BUILD.gn b/src/base/BUILD.gn
index 3dd1ecb..06c88b5 100644
--- a/src/base/BUILD.gn
+++ b/src/base/BUILD.gn
@@ -38,6 +38,7 @@
   if (!is_win) {
     sources += [
       "event.cc",
+      "pipe.cc",
       "temp_file.cc",
       "unix_task_runner.cc",
     ]
diff --git a/src/base/event.cc b/src/base/event.cc
index e2ced94..4fa6ec0 100644
--- a/src/base/event.cc
+++ b/src/base/event.cc
@@ -19,6 +19,7 @@
 
 #include "perfetto/base/event.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 
 #if PERFETTO_USE_EVENTFD()
 #include <sys/eventfd.h>
@@ -32,19 +33,11 @@
   fd_.reset(eventfd(/* start value */ 0, EFD_CLOEXEC | EFD_NONBLOCK));
   PERFETTO_CHECK(fd_);
 #else
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(pipe_fds) == 0);
-
   // Make the pipe non-blocking so that we never block the waking thread (either
   // the main thread or another one) when scheduling a wake-up.
-  for (auto fd : pipe_fds) {
-    int flags = fcntl(fd, F_GETFL, 0);
-    PERFETTO_CHECK(flags != -1);
-    PERFETTO_CHECK(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
-    PERFETTO_CHECK(fcntl(fd, F_SETFD, FD_CLOEXEC) == 0);
-  }
-  fd_.reset(pipe_fds[0]);
-  write_fd_.reset(pipe_fds[1]);
+  Pipe pipe = Pipe::Create(Pipe::kBothNonBlock);
+  fd_ = std::move(pipe.rd);
+  write_fd_ = std::move(pipe.wr);
 #endif  // !PERFETTO_USE_EVENTFD()
 }
 
diff --git a/src/base/pipe.cc b/src/base/pipe.cc
new file mode 100644
index 0000000..db7fd3a
--- /dev/null
+++ b/src/base/pipe.cc
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/base/pipe.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+namespace base {
+
+Pipe::Pipe() = default;
+Pipe::Pipe(Pipe&&) noexcept = default;
+Pipe& Pipe::operator=(Pipe&&) = default;
+
+Pipe Pipe::Create(Flags flags) {
+  int fds[2];
+  PERFETTO_CHECK(pipe(fds) == 0);
+  Pipe p;
+  p.rd.reset(fds[0]);
+  p.wr.reset(fds[1]);
+
+  PERFETTO_CHECK(fcntl(*p.rd, F_SETFD, FD_CLOEXEC) == 0);
+  PERFETTO_CHECK(fcntl(*p.wr, F_SETFD, FD_CLOEXEC) == 0);
+
+  if (flags == kBothNonBlock || flags == kRdNonBlock) {
+    int cur_flags = fcntl(*p.rd, F_GETFL, 0);
+    PERFETTO_CHECK(cur_flags >= 0);
+    PERFETTO_CHECK(fcntl(*p.rd, F_SETFL, cur_flags | O_NONBLOCK) == 0);
+  }
+
+  if (flags == kBothNonBlock || flags == kWrNonBlock) {
+    int cur_flags = fcntl(*p.wr, F_GETFL, 0);
+    PERFETTO_CHECK(cur_flags >= 0);
+    PERFETTO_CHECK(fcntl(*p.wr, F_SETFL, cur_flags | O_NONBLOCK) == 0);
+  }
+  return p;
+}
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/task_runner_unittest.cc b/src/base/task_runner_unittest.cc
index 0bde467..eb6309a 100644
--- a/src/base/task_runner_unittest.cc
+++ b/src/base/task_runner_unittest.cc
@@ -28,6 +28,7 @@
 #include <thread>
 
 #include "perfetto/base/file_utils.h"
+#include "perfetto/base/pipe.h"
 
 namespace perfetto {
 namespace base {
@@ -47,28 +48,21 @@
 #endif
 TYPED_TEST_CASE(TaskRunnerTest, TaskRunnerTypes);
 
-struct Pipe {
-  Pipe() {
-    int pipe_fds[2];
-    PERFETTO_DCHECK(pipe(pipe_fds) == 0);
-    read_fd.reset(pipe_fds[0]);
-    write_fd.reset(pipe_fds[1]);
+struct TestPipe : Pipe {
+  TestPipe() : Pipe(Pipe::Create()) {
     // Make the pipe initially readable.
     Write();
   }
 
   void Read() {
     char b;
-    PERFETTO_DCHECK(read(read_fd.get(), &b, 1) == 1);
+    PERFETTO_DCHECK(read(*this->rd, &b, 1) == 1);
   }
 
   void Write() {
     const char b = '?';
-    PERFETTO_DCHECK(WriteAll(write_fd.get(), &b, 1) == 1);
+    PERFETTO_DCHECK(WriteAll(*this->wr, &b, 1) == 1);
   }
-
-  ScopedFile read_fd;
-  ScopedFile write_fd;
 };
 
 TYPED_TEST(TaskRunnerTest, PostImmediateTask) {
@@ -141,20 +135,20 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  TestPipe pipe;
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   bool watch_ran = false;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
-  task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
+  task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
 
@@ -163,13 +157,13 @@
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromTask) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   bool watch_ran = false;
   task_runner.PostTask([&task_runner, &pipe] {
-    task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
+    task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
   });
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
@@ -179,30 +173,30 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe, &pipe2] {
+      pipe.rd.get(), [&task_runner, &pipe, &pipe2] {
         pipe.Read();
         task_runner.AddFileDescriptorWatch(
-            pipe2.read_fd.get(), [&task_runner] { task_runner.Quit(); });
+            pipe2.rd.get(), [&task_runner] { task_runner.Quit(); });
       });
   task_runner.Run();
 }
 
 TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   bool watch_ran = false;
   task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe, &pipe2] {
+      pipe.rd.get(), [&task_runner, &pipe, &pipe2] {
         pipe.Read();
-        task_runner.RemoveFileDescriptorWatch(pipe2.read_fd.get());
+        task_runner.RemoveFileDescriptorWatch(pipe2.rd.get());
       });
-  task_runner.AddFileDescriptorWatch(pipe2.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
   task_runner.Run();
@@ -212,17 +206,16 @@
 
 TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromAnotherWatch) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
-  Pipe pipe2;
+  TestPipe pipe;
+  TestPipe pipe2;
 
   bool watch_ran = false;
-  task_runner.AddFileDescriptorWatch(
-      pipe.read_fd.get(), [&task_runner, &pipe2] {
-        task_runner.RemoveFileDescriptorWatch(pipe2.read_fd.get());
-        task_runner.AddFileDescriptorWatch(
-            pipe2.read_fd.get(), [&task_runner] { task_runner.Quit(); });
-      });
-  task_runner.AddFileDescriptorWatch(pipe2.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(), [&task_runner, &pipe2] {
+    task_runner.RemoveFileDescriptorWatch(pipe2.rd.get());
+    task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
+                                       [&task_runner] { task_runner.Quit(); });
+  });
+  task_runner.AddFileDescriptorWatch(pipe2.rd.get(),
                                      [&watch_ran] { watch_ran = true; });
   task_runner.Run();
 
@@ -231,10 +224,10 @@
 
 TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherThread) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   std::thread thread([&task_runner, &pipe] {
-    task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+    task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                        [&task_runner] { task_runner.Quit(); });
   });
   task_runner.Run();
@@ -243,10 +236,10 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorWatchWithMultipleEvents) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   int event_count = 0;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner, &pipe, &event_count] {
                                        if (++event_count == 3) {
                                          task_runner.Quit();
@@ -261,13 +254,9 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorClosedEvent) {
   auto& task_runner = this->task_runner;
-  int pipe_fds[2];
-  PERFETTO_DCHECK(pipe(pipe_fds) == 0);
-  ScopedFile read_fd(pipe_fds[0]);
-  ScopedFile write_fd(pipe_fds[1]);
-
-  write_fd.reset();
-  task_runner.AddFileDescriptorWatch(read_fd.get(),
+  TestPipe pipe;
+  pipe.wr.reset();
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
@@ -305,9 +294,9 @@
 
 TYPED_TEST(TaskRunnerTest, FileDescriptorWatchesNotStarved) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
   task_runner.PostTask(std::bind(&RepeatingTask<TypeParam>, &task_runner));
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
   task_runner.Run();
 }
@@ -324,15 +313,14 @@
 
 TYPED_TEST(TaskRunnerTest, NoDuplicateFileDescriptorWatchCallbacks) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
   bool watch_called = 0;
   int counter = 10;
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
-                                     [&pipe, &watch_called] {
-                                       ASSERT_FALSE(watch_called);
-                                       pipe.Read();
-                                       watch_called = true;
-                                     });
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(), [&pipe, &watch_called] {
+    ASSERT_FALSE(watch_called);
+    pipe.Read();
+    watch_called = true;
+  });
   task_runner.PostTask(
       std::bind(&CountdownTask<TypeParam>, &task_runner, &counter));
   task_runner.Run();
@@ -340,16 +328,16 @@
 
 TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromOtherThread) {
   auto& task_runner = this->task_runner;
-  Pipe pipe;
+  TestPipe pipe;
 
   // The two watch tasks here race each other. We don't particularly care which
   // wins as long as one of them runs.
-  task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+  task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                      [&task_runner] { task_runner.Quit(); });
 
   std::thread thread([&task_runner, &pipe] {
-    task_runner.RemoveFileDescriptorWatch(pipe.read_fd.get());
-    task_runner.AddFileDescriptorWatch(pipe.read_fd.get(),
+    task_runner.RemoveFileDescriptorWatch(pipe.rd.get());
+    task_runner.AddFileDescriptorWatch(pipe.rd.get(),
                                        [&task_runner] { task_runner.Quit(); });
   });
 
diff --git a/src/base/unix_socket_unittest.cc b/src/base/unix_socket_unittest.cc
index 9e59757..feff166 100644
--- a/src/base/unix_socket_unittest.cc
+++ b/src/base/unix_socket_unittest.cc
@@ -27,6 +27,7 @@
 #include "perfetto/base/build_config.h"
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/temp_file.h"
 #include "perfetto/base/utils.h"
 #include "src/base/test/test_task_runner.h"
@@ -333,9 +334,7 @@
 // the socket to the client. Both processes mmap the file in shared mode and
 // check that they see the same contents.
 TEST_F(UnixSocketTest, SharedMemory) {
-  int pipes[2];
-  ASSERT_EQ(0, pipe(pipes));
-
+  Pipe pipe = Pipe::Create();
   pid_t pid = fork();
   ASSERT_GE(pid, 0);
   constexpr size_t kTmpSize = 4096;
@@ -353,7 +352,7 @@
     auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
     ASSERT_TRUE(srv->is_listening());
     // Signal the other process that it can connect.
-    ASSERT_EQ(1, base::WriteAll(pipes[1], ".", 1));
+    ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
     EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
         .WillOnce(Invoke(
@@ -373,7 +372,7 @@
     _exit(0);
   } else {
     char sync_cmd = '\0';
-    ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
+    ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
     ASSERT_EQ('.', sync_cmd);
     auto cli =
         UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
diff --git a/src/base/utils_unittest.cc b/src/base/utils_unittest.cc
index 1f556d6..4fb35bf 100644
--- a/src/base/utils_unittest.cc
+++ b/src/base/utils_unittest.cc
@@ -24,6 +24,7 @@
 #include "gtest/gtest.h"
 
 #include "perfetto/base/file_utils.h"
+#include "perfetto/base/pipe.h"
 
 namespace perfetto {
 namespace base {
@@ -58,10 +59,8 @@
   EXPECT_EQ(4u, ArraySize(bar_4));
 }
 
-int pipe_fd[2];
-
 TEST(UtilsTest, EintrWrapper) {
-  ASSERT_EQ(0, pipe(pipe_fd));
+  Pipe pipe = Pipe::Create();
 
   struct sigaction sa = {};
   struct sigaction old_sa = {};
@@ -81,21 +80,21 @@
   if (pid == 0 /* child */) {
     usleep(5000);
     kill(parent_pid, SIGUSR2);
-    ignore_result(WriteAll(pipe_fd[1], "foo\0", 4));
+    ignore_result(WriteAll(*pipe.wr, "foo\0", 4));
     _exit(0);
   }
 
   char buf[6] = {};
-  EXPECT_EQ(4, PERFETTO_EINTR(read(pipe_fd[0], buf, sizeof(buf))));
-  EXPECT_STREQ("foo", buf);
-  EXPECT_EQ(0, PERFETTO_EINTR(close(pipe_fd[0])));
-  EXPECT_EQ(0, PERFETTO_EINTR(close(pipe_fd[1])));
+  EXPECT_EQ(4, PERFETTO_EINTR(read(*pipe.rd, buf, sizeof(buf))));
+  EXPECT_EQ(0, PERFETTO_EINTR(close(*pipe.rd)));
+  pipe.wr.reset();
 
   // A 2nd close should fail with the proper errno.
-  int res = close(pipe_fd[0]);
+  int res = close(*pipe.rd);
   auto err = errno;
   EXPECT_EQ(-1, res);
   EXPECT_EQ(EBADF, err);
+  pipe.rd.release();
 
   // Restore the old handler.
   sigaction(SIGUSR2, &old_sa, nullptr);
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index f47d3d5..8131633 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -470,10 +470,7 @@
 
 void PerfettoCmd::SetupCtrlCSignalHandler() {
   // Setup the pipe used to deliver the CTRL-C notification from signal handler.
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(pipe_fds) == 0);
-  ctrl_c_pipe_rd_.reset(pipe_fds[0]);
-  ctrl_c_pipe_wr_.reset(pipe_fds[1]);
+  ctrl_c_pipe_ = base::Pipe::Create();
 
   // Setup signal handler.
   struct sigaction sa {};
@@ -494,7 +491,7 @@
   sigaction(SIGINT, &sa, nullptr);
 
   task_runner_.AddFileDescriptorWatch(
-      *ctrl_c_pipe_rd_, [this] { consumer_endpoint_->DisableTracing(); });
+      *ctrl_c_pipe_.rd, [this] { consumer_endpoint_->DisableTracing(); });
 }
 
 int __attribute__((visibility("default")))
diff --git a/src/perfetto_cmd/perfetto_cmd.h b/src/perfetto_cmd/perfetto_cmd.h
index f96b73f..972664f 100644
--- a/src/perfetto_cmd/perfetto_cmd.h
+++ b/src/perfetto_cmd/perfetto_cmd.h
@@ -24,6 +24,7 @@
 #include <time.h>
 
 #include "perfetto/base/build_config.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/unix_task_runner.h"
 #include "perfetto/tracing/core/consumer.h"
@@ -58,7 +59,7 @@
   void OnTracingDisabled() override;
   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
 
-  int ctrl_c_pipe_wr() const { return *ctrl_c_pipe_wr_; }
+  int ctrl_c_pipe_wr() const { return *ctrl_c_pipe_.wr; }
 
  private:
   bool OpenOutputFile();
@@ -73,8 +74,7 @@
   std::unique_ptr<TraceConfig> trace_config_;
   base::ScopedFstream trace_out_stream_;
   std::string trace_out_path_;
-  base::ScopedFile ctrl_c_pipe_wr_;
-  base::ScopedFile ctrl_c_pipe_rd_;
+  base::Pipe ctrl_c_pipe_;
   std::string dropbox_tag_;
   bool did_process_full_trace_ = false;
   uint64_t bytes_written_ = 0;
diff --git a/src/traced/probes/ftrace/atrace_wrapper.cc b/src/traced/probes/ftrace/atrace_wrapper.cc
index 3132c66..e0e63f8 100644
--- a/src/traced/probes/ftrace/atrace_wrapper.cc
+++ b/src/traced/probes/ftrace/atrace_wrapper.cc
@@ -26,6 +26,7 @@
 #include <unistd.h>
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/time.h"
 
 namespace perfetto {
@@ -47,40 +48,34 @@
   argv.push_back(nullptr);
 
   // Create the pipe for the child process to return stderr.
-  int filedes[2];
-  PERFETTO_CHECK(pipe(filedes) == 0);
-
-  int err = fcntl(filedes[0], F_SETFL, fcntl(filedes[0], F_GETFL) | O_NONBLOCK);
-  PERFETTO_CHECK(err == 0);
+  base::Pipe err_pipe = base::Pipe::Create(base::Pipe::kRdNonBlock);
 
   pid_t pid = fork();
   PERFETTO_CHECK(pid >= 0);
   if (pid == 0) {
     // Duplicate the write end of the pipe into stderr.
-    if ((dup2(filedes[1], STDERR_FILENO) == -1)) {
+    if ((dup2(*err_pipe.wr, STDERR_FILENO) == -1)) {
       const char kError[] = "Unable to duplicate stderr fd";
-      base::ignore_result(write(filedes[1], kError, sizeof(kError)));
+      base::ignore_result(write(*err_pipe.wr, kError, sizeof(kError)));
       _exit(1);
     }
 
     // Close stdin/out + any file descriptor that we might have mistakenly
-    // not marked as FD_CLOEXEC.
+    // not marked as FD_CLOEXEC. |err_pipe| is FD_CLOEXEC and will be
+    // automatically closed on exec.
     for (int i = 0; i < 128; i++) {
       if (i != STDERR_FILENO)
         close(i);
     }
 
-    // Close the read and write end of the pipe fds.
-    close(filedes[1]);
-    close(filedes[0]);
-
     execv("/system/bin/atrace", &argv[0]);
+
     // Reached only if execv fails.
     _exit(1);
   }
 
   // Close the write end of the pipe.
-  close(filedes[1]);
+  err_pipe.wr.reset();
 
   // Collect the output from child process.
   char buffer[4096];
@@ -89,7 +84,7 @@
   // Get the read end of the pipe.
   constexpr uint8_t kFdCount = 1;
   struct pollfd fds[kFdCount]{};
-  fds[0].fd = filedes[0];
+  fds[0].fd = *err_pipe.rd;
   fds[0].events = POLLIN;
 
   // Store the start time of atrace and setup the timeout.
@@ -126,7 +121,7 @@
     }
 
     // Data is available to be read from the fd.
-    int64_t count = PERFETTO_EINTR(read(filedes[0], buffer, sizeof(buffer)));
+    int64_t count = PERFETTO_EINTR(read(*err_pipe.rd, buffer, sizeof(buffer)));
     if (ret < 0 && errno == EAGAIN) {
       continue;
     } else if (count < 0) {
@@ -139,9 +134,6 @@
     error.append(buffer, static_cast<size_t>(count));
   }
 
-  // Close the read end of the pipe.
-  close(filedes[0]);
-
   // Wait until the child process exits fully.
   PERFETTO_EINTR(waitpid(pid, &status, 0));
 
diff --git a/src/traced/probes/ftrace/cpu_reader.cc b/src/traced/probes/ftrace/cpu_reader.cc
index e607cf7..baa89aa 100644
--- a/src/traced/probes/ftrace/cpu_reader.cc
+++ b/src/traced/probes/ftrace/cpu_reader.cc
@@ -136,23 +136,16 @@
                      base::ScopedFile fd,
                      std::function<void()> on_data_available)
     : table_(table), cpu_(cpu), trace_fd_(std::move(fd)) {
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0);
-  staging_read_fd_.reset(pipe_fds[0]);
-  staging_write_fd_.reset(pipe_fds[1]);
+  // Both reads and writes from/to the staging pipe are always non-blocking.
+  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
+  // blocking vs non-blocking behavior is controlled solely by the
+  // SPLICE_F_NONBLOCK flag passed to splice().
+  staging_pipe_ = base::Pipe::Create(base::Pipe::kBothNonBlock);
 
   // Make reads from the raw pipe blocking so that splice() can sleep.
   PERFETTO_CHECK(trace_fd_);
   SetBlocking(*trace_fd_, true);
 
-  // Reads from the staging pipe are always non-blocking.
-  SetBlocking(*staging_read_fd_, false);
-
-  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
-  // blocking vs non-blocking behavior is controlled solely by the
-  // SPLICE_F_NONBLOCK flag passed to splice().
-  SetBlocking(*staging_write_fd_, false);
-
   // We need a non-default SIGPIPE handler to make it so that the blocking
   // splice() is woken up when the ~CpuReader() dtor destroys the pipes.
   // Just masking out the signal would cause an implicit syscall restart and
@@ -172,7 +165,7 @@
 
   worker_thread_ =
       std::thread(std::bind(&RunWorkerThread, cpu_, *trace_fd_,
-                            *staging_write_fd_, on_data_available, &cmd_));
+                            *staging_pipe_.wr, on_data_available, &cmd_));
 }
 
 CpuReader::~CpuReader() {
@@ -271,7 +264,7 @@
   for (;;) {
     uint8_t* buffer = GetBuffer();
     long bytes =
-        PERFETTO_EINTR(read(*staging_read_fd_, buffer, base::kPageSize));
+        PERFETTO_EINTR(read(*staging_pipe_.rd, buffer, base::kPageSize));
     if (bytes == -1 && errno == EAGAIN)
       break;
     PERFETTO_CHECK(static_cast<size_t>(bytes) == base::kPageSize);
diff --git a/src/traced/probes/ftrace/cpu_reader.h b/src/traced/probes/ftrace/cpu_reader.h
index bdd067f..7cedd50 100644
--- a/src/traced/probes/ftrace/cpu_reader.h
+++ b/src/traced/probes/ftrace/cpu_reader.h
@@ -28,6 +28,7 @@
 
 #include "perfetto/base/gtest_prod_util.h"
 #include "perfetto/base/paged_memory.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/thread_checker.h"
 #include "perfetto/protozero/message.h"
@@ -193,8 +194,7 @@
   const ProtoTranslationTable* const table_;
   const size_t cpu_;
   base::ScopedFile trace_fd_;
-  base::ScopedFile staging_read_fd_;
-  base::ScopedFile staging_write_fd_;
+  base::Pipe staging_pipe_;
   base::PagedMemory buffer_;
   std::thread worker_thread_;
   std::atomic<ThreadCtl> cmd_{kRun};
diff --git a/tools/ftrace_proto_gen/ftrace_proto_gen.cc b/tools/ftrace_proto_gen/ftrace_proto_gen.cc
index 4a1747b..7c6d4ac 100644
--- a/tools/ftrace_proto_gen/ftrace_proto_gen.cc
+++ b/tools/ftrace_proto_gen/ftrace_proto_gen.cc
@@ -25,6 +25,7 @@
 
 #include "perfetto/base/file_utils.h"
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/string_splitter.h"
 
 namespace perfetto {
@@ -43,30 +44,17 @@
   return haystack.find(needle) != std::string::npos;
 }
 
-int SetNonBlocking(int fd) {
-  int flags = fcntl(fd, F_GETFL, 0);
-  if (flags == -1)
-    return -1;
-  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
-}
-
 std::string RunClangFmt(const std::string& input) {
   std::string output;
   pid_t pid;
-  int input_pipes[2];
-  int output_pipes[2];
-  PERFETTO_CHECK(pipe(input_pipes) != -1);
-  PERFETTO_CHECK(SetNonBlocking(input_pipes[0]) != -1);
-  PERFETTO_CHECK(SetNonBlocking(input_pipes[1]) != -1);
-  PERFETTO_CHECK(pipe(output_pipes) != -1);
-  PERFETTO_CHECK(SetNonBlocking(output_pipes[0]) != -1);
-  PERFETTO_CHECK(SetNonBlocking(output_pipes[1]) != -1);
+  base::Pipe input_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
+  base::Pipe output_pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
   if ((pid = fork()) == 0) {
     // Child
-    PERFETTO_CHECK(dup2(input_pipes[0], STDIN_FILENO) != -1);
-    PERFETTO_CHECK(dup2(output_pipes[1], STDOUT_FILENO) != -1);
-    close(input_pipes[1]);
-    close(output_pipes[0]);
+    PERFETTO_CHECK(dup2(*input_pipe.rd, STDIN_FILENO) != -1);
+    PERFETTO_CHECK(dup2(*output_pipe.wr, STDOUT_FILENO) != -1);
+    input_pipe.wr.reset();
+    output_pipe.rd.reset();
     PERFETTO_CHECK(execl("buildtools/linux64/clang-format", "clang-format",
                          nullptr) != -1);
   }
@@ -74,15 +62,15 @@
   // Parent
   size_t written = 0;
   size_t bytes_read = 0;
-  close(input_pipes[0]);
-  close(output_pipes[1]);
+  input_pipe.rd.reset();
+  output_pipe.wr.reset();
   // This cannot be left uninitialized because there's as continue statement
   // before the first assignment to this in the loop.
   ssize_t r = -1;
   do {
     if (written < input.size()) {
       ssize_t w =
-          write(input_pipes[1], &(input[written]), input.size() - written);
+          write(*input_pipe.wr, &(input[written]), input.size() - written);
       if (w == -1) {
         if (errno == EAGAIN || errno == EINTR)
           continue;
@@ -90,12 +78,12 @@
       }
       written += static_cast<size_t>(w);
       if (written == input.size())
-        close(input_pipes[1]);
+        input_pipe.wr.reset();
     }
 
     if (bytes_read + base::kPageSize > output.size())
       output.resize(output.size() + base::kPageSize);
-    r = read(output_pipes[0], &(output[bytes_read]), base::kPageSize);
+    r = read(*output_pipe.rd, &(output[bytes_read]), base::kPageSize);
     if (r == -1) {
       if (errno == EAGAIN || errno == EINTR)
         continue;
diff --git a/tools/pipestats.cc b/tools/pipestats.cc
index 6279efc..df83569 100644
--- a/tools/pipestats.cc
+++ b/tools/pipestats.cc
@@ -25,6 +25,7 @@
 #include <unistd.h>  // pipe
 
 #include "perfetto/base/logging.h"
+#include "perfetto/base/pipe.h"
 #include "perfetto/base/scoped_file.h"
 #include "perfetto/base/time.h"
 #include "perfetto/base/utils.h"
@@ -51,24 +52,17 @@
   PERFETTO_CHECK(trace_fd);
   std::thread reader(ReadLoop, trace_fd.get());
 
-  int pipe_fds[2];
-  PERFETTO_CHECK(pipe(&pipe_fds[0]) == 0);
-  base::ScopedFile staging_read_fd(pipe_fds[0]);
-  base::ScopedFile staging_write_fd(pipe_fds[1]);
+  // Reads from the staging pipe are always non-blocking.
+  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
+  // blocking vs non-blocking behavior is controlled solely by the
+  // SPLICE_F_NONBLOCK flag passed to splice().
+  base::Pipe pipe = base::Pipe::Create(base::Pipe::kBothNonBlock);
 
   // Make reads from the raw pipe blocking so that splice() can sleep.
   SetBlocking(*trace_fd, true);
 
-  // Reads from the staging pipe are always non-blocking.
-  SetBlocking(*staging_read_fd, false);
-
-  // Note: O_NONBLOCK seems to be ignored by splice() on the target pipe. The
-  // blocking vs non-blocking behavior is controlled solely by the
-  // SPLICE_F_NONBLOCK flag passed to splice().
-  SetBlocking(*staging_write_fd, false);
-
   for (;;) {
-    ssize_t splice_res = splice(*trace_fd, nullptr, *staging_write_fd, nullptr,
+    ssize_t splice_res = splice(*trace_fd, nullptr, *pipe.wr, nullptr,
                                 base::kPageSize, SPLICE_F_MOVE);
     if (splice_res > 0) {
       auto cur = base::GetWallTimeNs();