Alternative workaround for mac kernel bug.

BUG=298276

Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=227999

Review URL: https://codereview.chromium.org/25325002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@228569 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: dc875dc95290a9a5e4245e24e209a0496038f46c
diff --git a/ipc/ipc_send_fds_test.cc b/ipc/ipc_send_fds_test.cc
index 4cddc1c..20c3ed5 100644
--- a/ipc/ipc_send_fds_test.cc
+++ b/ipc/ipc_send_fds_test.cc
@@ -11,13 +11,18 @@
 }
 #endif
 #include <fcntl.h>
+#include <sys/socket.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include <queue>
+
+#include "base/callback.h"
 #include "base/file_descriptor_posix.h"
 #include "base/message_loop/message_loop.h"
 #include "base/pickle.h"
 #include "base/posix/eintr_wrapper.h"
+#include "base/synchronization/waitable_event.h"
 #include "ipc/ipc_message_utils.h"
 #include "ipc/ipc_test_base.h"
 
@@ -26,57 +31,67 @@
 const unsigned kNumFDsToSend = 20;
 const char* kDevZeroPath = "/dev/zero";
 
-static void VerifyAndCloseDescriptor(int fd, ino_t inode_num) {
-  // Check that we can read from the FD.
-  char buf;
-  ssize_t amt_read = read(fd, &buf, 1);
-  ASSERT_EQ(amt_read, 1);
-  ASSERT_EQ(buf, 0);  // /dev/zero always reads 0 bytes.
-
-  struct stat st;
-  ASSERT_EQ(fstat(fd, &st), 0);
-
-  ASSERT_EQ(close(fd), 0);
-
-  // Compare inode numbers to check that the file sent over the wire is actually
-  // the one expected.
-  ASSERT_EQ(inode_num, st.st_ino);
-}
-
-class MyChannelDescriptorListener : public IPC::Listener {
+class MyChannelDescriptorListenerBase : public IPC::Listener {
  public:
-  explicit MyChannelDescriptorListener(ino_t expected_inode_num)
-      : expected_inode_num_(expected_inode_num),
-        num_fds_received_(0) {}
-
   virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
     PickleIterator iter(message);
 
-    ++num_fds_received_;
     base::FileDescriptor descriptor;
 
     IPC::ParamTraits<base::FileDescriptor>::Read(&message, &iter, &descriptor);
 
-    VerifyAndCloseDescriptor(descriptor.fd, expected_inode_num_);
-    if (num_fds_received_ == kNumFDsToSend)
-      base::MessageLoop::current()->Quit();
-
+    HandleFD(descriptor.fd);
     return true;
   }
 
-  virtual void OnChannelError() OVERRIDE {
-    base::MessageLoop::current()->Quit();
+ protected:
+  virtual void HandleFD(int fd) = 0;
+};
+
+class MyChannelDescriptorListener : public MyChannelDescriptorListenerBase {
+ public:
+  explicit MyChannelDescriptorListener(ino_t expected_inode_num)
+      : MyChannelDescriptorListenerBase(),
+        expected_inode_num_(expected_inode_num),
+        num_fds_received_(0) {
   }
 
   bool GotExpectedNumberOfDescriptors() const {
     return num_fds_received_ == kNumFDsToSend;
   }
 
+  virtual void OnChannelError() OVERRIDE {
+    base::MessageLoop::current()->Quit();
+  }
+
+ protected:
+  virtual void HandleFD(int fd) OVERRIDE {
+    // Check that we can read from the FD.
+    char buf;
+    ssize_t amt_read = read(fd, &buf, 1);
+    ASSERT_EQ(amt_read, 1);
+    ASSERT_EQ(buf, 0);  // /dev/zero always reads 0 bytes.
+
+    struct stat st;
+    ASSERT_EQ(fstat(fd, &st), 0);
+
+    ASSERT_EQ(close(fd), 0);
+
+    // Compare inode numbers to check that the file sent over the wire is
+    // actually the one expected.
+    ASSERT_EQ(expected_inode_num_, st.st_ino);
+
+    ++num_fds_received_;
+    if (num_fds_received_ == kNumFDsToSend)
+      base::MessageLoop::current()->Quit();
+  }
+
  private:
   ino_t expected_inode_num_;
   unsigned num_fds_received_;
 };
 
+
 class IPCSendFdsTest : public IPCTestBase {
  protected:
   void RunServer() {
@@ -178,6 +193,188 @@
 }
 #endif  // defined(OS_MACOSX)
 
+
+class MyCBListener : public MyChannelDescriptorListenerBase {
+ public:
+  MyCBListener(base::Callback<void(int)> cb, int fds_to_send)
+      : MyChannelDescriptorListenerBase(),
+        cb_(cb) {
+    }
+
+ protected:
+  virtual void HandleFD(int fd) OVERRIDE {
+    cb_.Run(fd);
+  }
+ private:
+  base::Callback<void(int)> cb_;
+};
+
+std::pair<int, int> make_socket_pair() {
+  int pipe_fds[2];
+  CHECK_EQ(0, HANDLE_EINTR(socketpair(AF_UNIX, SOCK_STREAM, 0, pipe_fds)));
+  return std::pair<int, int>(pipe_fds[0], pipe_fds[1]);
+}
+
+static void null_cb(int unused_fd) {
+  NOTREACHED();
+}
+
+class PipeChannelHelper {
+ public:
+  PipeChannelHelper(base::Thread* in_thread,
+                    base::Thread* out_thread,
+                    base::Callback<void(int)> cb,
+                    int fds_to_send) :
+      in_thread_(in_thread),
+      out_thread_(out_thread),
+      cb_listener_(cb, fds_to_send),
+      null_listener_(base::Bind(&null_cb), 0) {
+  }
+
+  void Init() {
+    IPC::ChannelHandle in_handle("IN");
+    in.reset(new IPC::Channel(in_handle,
+                              IPC::Channel::MODE_SERVER,
+                              &null_listener_));
+    base::FileDescriptor out_fd(in->TakeClientFileDescriptor(), false);
+    IPC::ChannelHandle out_handle("OUT", out_fd);
+    out.reset(new IPC::Channel(out_handle,
+                               IPC::Channel::MODE_CLIENT,
+                               &cb_listener_));
+    // PostTask the connect calls to make sure the callbacks happens
+    // on the right threads.
+    in_thread_->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&PipeChannelHelper::Connect, in.get()));
+    out_thread_->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&PipeChannelHelper::Connect, out.get()));
+  }
+
+  static void DestroyChannel(scoped_ptr<IPC::Channel> *c,
+                             base::WaitableEvent *event) {
+    c->reset(0);
+    event->Signal();
+  }
+
+  ~PipeChannelHelper() {
+    base::WaitableEvent a(true, false);
+    base::WaitableEvent b(true, false);
+    in_thread_->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&PipeChannelHelper::DestroyChannel, &in, &a));
+    out_thread_->message_loop()->PostTask(
+        FROM_HERE,
+        base::Bind(&PipeChannelHelper::DestroyChannel, &out, &b));
+    a.Wait();
+    b.Wait();
+  }
+
+  static void Connect(IPC::Channel *channel) {
+    EXPECT_TRUE(channel->Connect());
+  }
+
+  void Send(int fd) {
+    CHECK_EQ(base::MessageLoop::current(), in_thread_->message_loop());
+
+    ASSERT_GE(fd, 0);
+    base::FileDescriptor descriptor(fd, true);
+
+    IPC::Message* message =
+        new IPC::Message(0, 3, IPC::Message::PRIORITY_NORMAL);
+    IPC::ParamTraits<base::FileDescriptor>::Write(message, descriptor);
+    ASSERT_TRUE(in->Send(message));
+  }
+
+ private:
+  scoped_ptr<IPC::Channel> in, out;
+  base::Thread* in_thread_;
+  base::Thread* out_thread_;
+  MyCBListener cb_listener_;
+  MyCBListener null_listener_;
+};
+
+// This test is meant to provoke a kernel bug on OSX, and to prove
+// that the workaround for it is working. It sets up two pipes and three
+// threads, the producer thread creates socketpairs and sends one of the fds
+// over pipe1 to the middleman thread. The middleman thread simply takes the fd
+// sends it over pipe2 to the consumer thread. The consumer thread writes a byte
+// to each fd it receives and then closes the pipe. The producer thread reads
+// the bytes back from each pair of pipes and make sure that everything worked.
+// This feedback mechanism makes sure that not too many file descriptors are
+// in flight at the same time. For more info on the bug, see:
+// http://crbug.com/298276
+class IPCMultiSendingFdsTest : public testing::Test {
+ public:
+  IPCMultiSendingFdsTest() : received_(true, false) {}
+
+  void Producer(PipeChannelHelper* dest,
+                base::Thread* t,
+                int pipes_to_send) {
+    for (int i = 0; i < pipes_to_send; i++) {
+      received_.Reset();
+      std::pair<int, int> pipe_fds = make_socket_pair();
+      t->message_loop()->PostTask(
+          FROM_HERE,
+          base::Bind(&PipeChannelHelper::Send,
+                     base::Unretained(dest),
+                     pipe_fds.second));
+      char tmp = 'x';
+      CHECK_EQ(1, HANDLE_EINTR(write(pipe_fds.first, &tmp, 1)));
+      CHECK_EQ(0, HANDLE_EINTR(close(pipe_fds.first)));
+      received_.Wait();
+    }
+  }
+
+  void ConsumerHandleFD(int fd) {
+    char tmp = 'y';
+    CHECK_EQ(1, HANDLE_EINTR(read(fd, &tmp, 1)));
+    CHECK_EQ(tmp, 'x');
+    CHECK_EQ(0, HANDLE_EINTR(close(fd)));
+    received_.Signal();
+  }
+
+  base::Thread* CreateThread(const char* name) {
+    base::Thread* ret = new base::Thread(name);
+    base::Thread::Options options;
+    options.message_loop_type = base::MessageLoop::TYPE_IO;
+    ret->StartWithOptions(options);
+    return ret;
+  }
+
+  void Run() {
+    // On my mac, this test fails roughly 35 times per
+    // million sends with low load, but much more with high load.
+    // Unless the workaround is in place. With 10000 sends, we
+    // should see at least a 3% failure rate.
+    const int pipes_to_send = 20000;
+    scoped_ptr<base::Thread> producer(CreateThread("producer"));
+    scoped_ptr<base::Thread> middleman(CreateThread("middleman"));
+    scoped_ptr<base::Thread> consumer(CreateThread("consumer"));
+    PipeChannelHelper pipe1(
+        middleman.get(),
+        consumer.get(),
+        base::Bind(&IPCMultiSendingFdsTest::ConsumerHandleFD,
+                   base::Unretained(this)),
+        pipes_to_send);
+    PipeChannelHelper pipe2(
+        producer.get(),
+        middleman.get(),
+        base::Bind(&PipeChannelHelper::Send, base::Unretained(&pipe1)),
+        pipes_to_send);
+    pipe1.Init();
+    pipe2.Init();
+    Producer(&pipe2, producer.get(), pipes_to_send);
+  }
+
+ private:
+  base::WaitableEvent received_;
+};
+
+TEST_F(IPCMultiSendingFdsTest, StressTest) {
+  Run();
+}
+
 }  // namespace
 
 #endif  // defined(OS_POSIX)