Introduce ChannelMojo

This CL introduces ChannelMojo IPC::Channel implementation
and optionally applies it for renderer-browser IPC channel.

Current stability is like 5-seconds browser and There are rough edges.
It often closes the channel so needs to be more robust.
Even though the level of stability, having it in the tree will helps
team to try and improve it.

BUG=377980
R=darin@chromium.org,jam@chromium.org,viettrungluu@chromium.org
TEST=ipc_channel_mojo_unittest.cc

Review URL: https://codereview.chromium.org/382333002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@287402 0039d316-1c4b-4281-b951-d872f2087c98


CrOS-Libchrome-Original-Commit: 6486088e8bb6dc810157503edfa3c75a58e9e49d
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index bca3ea0..fbd4ae0 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -170,6 +170,14 @@
   // listener.
   virtual base::ProcessId GetPeerPID() const = 0;
 
+  // Get its own process id. This value is told to the peer.
+  virtual base::ProcessId GetSelfPID() const = 0;
+
+  // Return connected ChannelHandle which the channel has owned.
+  // This method transfers the ownership to the caller
+  // so the channel isn't valid after the call.
+  virtual ChannelHandle TakePipeHandle() WARN_UNUSED_RESULT = 0;
+
   // Send a message over the Channel to the listener on the other end.
   //
   // |message| must be allocated using operator new.  This object will be
diff --git a/ipc/ipc_channel_factory.cc b/ipc/ipc_channel_factory.cc
new file mode 100644
index 0000000..4cb1790
--- /dev/null
+++ b/ipc/ipc_channel_factory.cc
@@ -0,0 +1,42 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_channel_factory.h"
+
+namespace IPC {
+
+namespace {
+
+class PlatformChannelFactory : public ChannelFactory {
+ public:
+  PlatformChannelFactory(ChannelHandle handle,
+                         Channel::Mode mode)
+      : handle_(handle), mode_(mode) {
+  }
+
+  virtual std::string GetName() const OVERRIDE {
+    return handle_.name;
+  }
+
+  virtual scoped_ptr<Channel> BuildChannel(
+      Listener* listener) OVERRIDE {
+    return Channel::Create(handle_, mode_, listener);
+  }
+
+ private:
+  ChannelHandle handle_;
+  Channel::Mode mode_;
+
+  DISALLOW_COPY_AND_ASSIGN(PlatformChannelFactory);
+};
+
+} // namespace
+
+// static
+scoped_ptr<ChannelFactory> ChannelFactory::Create(
+    const ChannelHandle& handle, Channel::Mode mode) {
+  return scoped_ptr<ChannelFactory>(new PlatformChannelFactory(handle, mode));
+}
+
+}  // namespace IPC
diff --git a/ipc/ipc_channel_factory.h b/ipc/ipc_channel_factory.h
new file mode 100644
index 0000000..30bfd8c
--- /dev/null
+++ b/ipc/ipc_channel_factory.h
@@ -0,0 +1,33 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_FACTORY_H_
+#define IPC_IPC_CHANNEL_FACTORY_H_
+
+#include <string>
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "ipc/ipc_channel.h"
+
+namespace IPC {
+
+// Encapsulates how a Channel is created. A ChannelFactory can be
+// passed to the constructor of ChannelProxy or SyncChannel to tell them
+// how to create underlying channel.
+class ChannelFactory {
+ public:
+  // Creates a factory for "native" channel built through
+  // IPC::Channel::Create().
+  static scoped_ptr<ChannelFactory> Create(
+      const ChannelHandle& handle, Channel::Mode mode);
+
+  virtual ~ChannelFactory() { }
+  virtual std::string GetName() const = 0;
+  virtual scoped_ptr<Channel> BuildChannel(Listener* listener) = 0;
+};
+
+}  // namespace IPC
+
+#endif  // IPC_IPC_CHANNEL_FACTORY_H_
diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc
index 8ddf73a..ac9de55 100644
--- a/ipc/ipc_channel_posix.cc
+++ b/ipc/ipc_channel_posix.cc
@@ -747,7 +747,7 @@
   }
 }
 
-int ChannelPosix::GetHelloMessageProcId() {
+int ChannelPosix::GetHelloMessageProcId() const {
   int pid = base::GetCurrentProcId();
 #if defined(OS_LINUX)
   // Our process may be in a sandbox with a separate PID namespace.
@@ -1050,6 +1050,17 @@
   return peer_pid_;
 }
 
+base::ProcessId ChannelPosix::GetSelfPID() const {
+  return GetHelloMessageProcId();
+}
+
+ChannelHandle ChannelPosix::TakePipeHandle() {
+  ChannelHandle handle = ChannelHandle(pipe_name_,
+                                       base::FileDescriptor(pipe_, false));
+  pipe_ = -1;
+  return handle;
+}
+
 //------------------------------------------------------------------------------
 // Channel's methods
 
diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h
index 7f17b2f..a235739 100644
--- a/ipc/ipc_channel_posix.h
+++ b/ipc/ipc_channel_posix.h
@@ -62,6 +62,8 @@
   virtual void Close() OVERRIDE;
   virtual bool Send(Message* message) OVERRIDE;
   virtual base::ProcessId GetPeerPID() const OVERRIDE;
+  virtual base::ProcessId GetSelfPID() const OVERRIDE;
+  virtual ChannelHandle TakePipeHandle() OVERRIDE;
   virtual int GetClientFileDescriptor() const OVERRIDE;
   virtual int TakeClientFileDescriptor() OVERRIDE;
 
@@ -94,7 +96,7 @@
 
   bool AcceptConnection();
   void ClosePipeOnError();
-  int GetHelloMessageProcId();
+  int GetHelloMessageProcId() const;
   void QueueHelloMessage();
   void CloseFileDescriptors(Message* msg);
   void QueueCloseFDMessage(int fd, int hops);
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc
index 7441c65..2ea722d 100644
--- a/ipc/ipc_channel_proxy.cc
+++ b/ipc/ipc_channel_proxy.cc
@@ -11,6 +11,7 @@
 #include "base/memory/scoped_ptr.h"
 #include "base/single_thread_task_runner.h"
 #include "base/thread_task_runner_handle.h"
+#include "ipc/ipc_channel_factory.h"
 #include "ipc/ipc_listener.h"
 #include "ipc/ipc_logging.h"
 #include "ipc/ipc_message_macros.h"
@@ -48,11 +49,10 @@
   ipc_task_runner_ = NULL;
 }
 
-void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
-                                          const Channel::Mode& mode) {
+void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
   DCHECK(!channel_);
-  channel_id_ = handle.name;
-  channel_ = Channel::Create(handle, mode, this);
+  channel_id_ = factory->GetName();
+  channel_ = factory->BuildChannel(this);
 }
 
 bool ChannelProxy::Context::TryFilters(const Message& message) {
@@ -315,6 +315,16 @@
   return channel.Pass();
 }
 
+// static
+scoped_ptr<ChannelProxy> ChannelProxy::Create(
+    scoped_ptr<ChannelFactory> factory,
+    Listener* listener,
+    base::SingleThreadTaskRunner* ipc_task_runner) {
+  scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
+  channel->Init(factory.Pass(), true);
+  return channel.Pass();
+}
+
 ChannelProxy::ChannelProxy(Context* context)
     : context_(context),
       did_init_(false) {
@@ -334,8 +344,6 @@
 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
                         Channel::Mode mode,
                         bool create_pipe_now) {
-  DCHECK(CalledOnValidThread());
-  DCHECK(!did_init_);
 #if defined(OS_POSIX)
   // When we are creating a server on POSIX, we need its file descriptor
   // to be created immediately so that it can be accessed and passed
@@ -345,17 +353,25 @@
     create_pipe_now = true;
   }
 #endif  // defined(OS_POSIX)
+  Init(ChannelFactory::Create(channel_handle, mode),
+       create_pipe_now);
+}
+
+void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
+                        bool create_pipe_now) {
+  DCHECK(CalledOnValidThread());
+  DCHECK(!did_init_);
 
   if (create_pipe_now) {
     // Create the channel immediately.  This effectively sets up the
     // low-level pipe so that the client can connect.  Without creating
     // the pipe immediately, it is possible for a listener to attempt
     // to connect and get an error since the pipe doesn't exist yet.
-    context_->CreateChannel(channel_handle, mode);
+    context_->CreateChannel(factory.Pass());
   } else {
     context_->ipc_task_runner()->PostTask(
-        FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
-                              channel_handle, mode));
+        FROM_HERE, base::Bind(&Context::CreateChannel,
+                              context_.get(), Passed(factory.Pass())));
   }
 
   // complete initialization on the background thread
diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h
index 2e483ac..e76e56f 100644
--- a/ipc/ipc_channel_proxy.h
+++ b/ipc/ipc_channel_proxy.h
@@ -22,6 +22,7 @@
 
 namespace IPC {
 
+class ChannelFactory;
 class MessageFilter;
 class MessageFilterRouter;
 class SendCallbackHelper;
@@ -70,6 +71,11 @@
       Listener* listener,
       base::SingleThreadTaskRunner* ipc_task_runner);
 
+  static scoped_ptr<ChannelProxy> Create(
+      scoped_ptr<ChannelFactory> factory,
+      Listener* listener,
+      base::SingleThreadTaskRunner* ipc_task_runner);
+
   virtual ~ChannelProxy();
 
   // Initializes the channel proxy. Only call this once to initialize a channel
@@ -78,6 +84,7 @@
   // thread.
   void Init(const IPC::ChannelHandle& channel_handle, Channel::Mode mode,
             bool create_pipe_now);
+  void Init(scoped_ptr<ChannelFactory> factory, bool create_pipe_now);
 
   // Close the IPC::Channel.  This operation completes asynchronously, once the
   // background thread processes the command to close the channel.  It is ok to
@@ -171,8 +178,7 @@
     friend class SendCallbackHelper;
 
     // Create the Channel
-    void CreateChannel(const IPC::ChannelHandle& channel_handle,
-                       const Channel::Mode& mode);
+    void CreateChannel(scoped_ptr<ChannelFactory> factory);
 
     // Methods called on the IO thread.
     void OnSendMessage(scoped_ptr<Message> message_ptr);
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc
index 9a3cc3c..28a889a 100644
--- a/ipc/ipc_channel_reader.cc
+++ b/ipc/ipc_channel_reader.cc
@@ -38,13 +38,13 @@
   return DispatchInputData(input_buf_, bytes_read);
 }
 
-bool ChannelReader::IsInternalMessage(const Message& m) const {
+bool ChannelReader::IsInternalMessage(const Message& m) {
   return m.routing_id() == MSG_ROUTING_NONE &&
       m.type() >= Channel::CLOSE_FD_MESSAGE_TYPE &&
       m.type() <= Channel::HELLO_MESSAGE_TYPE;
 }
 
-bool ChannelReader::IsHelloMessage(const Message& m) const {
+bool ChannelReader::IsHelloMessage(const Message& m) {
   return m.routing_id() == MSG_ROUTING_NONE &&
       m.type() == Channel::HELLO_MESSAGE_TYPE;
 }
diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h
index 1303846..9dec8c1 100644
--- a/ipc/ipc_channel_reader.h
+++ b/ipc/ipc_channel_reader.h
@@ -7,6 +7,7 @@
 
 #include "base/basictypes.h"
 #include "ipc/ipc_channel.h"
+#include "ipc/ipc_export.h"
 
 namespace IPC {
 namespace internal {
@@ -44,11 +45,11 @@
 
   // Returns true if the given message is internal to the IPC implementation,
   // like the "hello" message sent on channel set-up.
-  bool IsInternalMessage(const Message& m) const;
+  bool IsInternalMessage(const Message& m);
 
   // Returns true if the given message is an Hello message
   // sent on channel set-up.
-  bool IsHelloMessage(const Message& m) const;
+  bool IsHelloMessage(const Message& m);
 
  protected:
   enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
diff --git a/ipc/ipc_channel_unittest.cc b/ipc/ipc_channel_unittest.cc
index b9665db..1f85311 100644
--- a/ipc/ipc_channel_unittest.cc
+++ b/ipc/ipc_channel_unittest.cc
@@ -15,76 +15,10 @@
 #include "base/threading/thread.h"
 #include "ipc/ipc_message.h"
 #include "ipc/ipc_test_base.h"
+#include "ipc/ipc_test_channel_listener.h"
 
 namespace {
 
-const size_t kLongMessageStringNumBytes = 50000;
-
-static void Send(IPC::Sender* sender, const char* text) {
-  static int message_index = 0;
-
-  IPC::Message* message = new IPC::Message(0,
-                                           2,
-                                           IPC::Message::PRIORITY_NORMAL);
-  message->WriteInt(message_index++);
-  message->WriteString(std::string(text));
-
-  // Make sure we can handle large messages.
-  char junk[kLongMessageStringNumBytes];
-  memset(junk, 'a', sizeof(junk)-1);
-  junk[sizeof(junk)-1] = 0;
-  message->WriteString(std::string(junk));
-
-  // DEBUG: printf("[%u] sending message [%s]\n", GetCurrentProcessId(), text);
-  sender->Send(message);
-}
-
-// A generic listener that expects messages of a certain type (see
-// OnMessageReceived()), and either sends a generic response or quits after the
-// 50th message (or on channel error).
-class GenericChannelListener : public IPC::Listener {
- public:
-  GenericChannelListener() : sender_(NULL), messages_left_(50) {}
-  virtual ~GenericChannelListener() {}
-
-  virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
-    PickleIterator iter(message);
-
-    int ignored;
-    EXPECT_TRUE(iter.ReadInt(&ignored));
-    std::string data;
-    EXPECT_TRUE(iter.ReadString(&data));
-    std::string big_string;
-    EXPECT_TRUE(iter.ReadString(&big_string));
-    EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length());
-
-    SendNextMessage();
-    return true;
-  }
-
-  virtual void OnChannelError() OVERRIDE {
-    // There is a race when closing the channel so the last message may be lost.
-    EXPECT_LE(messages_left_, 1);
-    base::MessageLoop::current()->Quit();
-  }
-
-  void Init(IPC::Sender* s) {
-    sender_ = s;
-  }
-
- protected:
-  void SendNextMessage() {
-    if (--messages_left_ <= 0)
-      base::MessageLoop::current()->Quit();
-    else
-      Send(sender_, "Foo");
-  }
-
- private:
-  IPC::Sender* sender_;
-  int messages_left_;
-};
-
 class IPCChannelTest : public IPCTestBase {
 };
 
@@ -124,13 +58,13 @@
   Init("GenericClient");
 
   // Set up IPC channel and start client.
-  GenericChannelListener listener;
+  IPC::TestChannelListener listener;
   CreateChannel(&listener);
   listener.Init(sender());
   ASSERT_TRUE(ConnectChannel());
   ASSERT_TRUE(StartClient());
 
-  Send(sender(), "hello from parent");
+  IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
 
   // Run message loop.
   base::MessageLoop::current()->Run();
@@ -149,7 +83,7 @@
 
   // Create pipe manually using the standard Chromium name and set up IPC
   // channel.
-  GenericChannelListener listener;
+  IPC::TestChannelListener listener;
   std::string name("\\\\.\\pipe\\chrome.");
   name.append(GetChannelName("GenericClient"));
   HANDLE pipe = CreateNamedPipeA(name.c_str(),
@@ -169,7 +103,7 @@
   ASSERT_TRUE(ConnectChannel());
   ASSERT_TRUE(StartClient());
 
-  Send(sender(), "hello from parent");
+  IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
 
   // Run message loop.
   base::MessageLoop::current()->Run();
@@ -191,13 +125,13 @@
   thread.StartWithOptions(options);
 
   // Set up IPC channel proxy.
-  GenericChannelListener listener;
+  IPC::TestChannelListener listener;
   CreateChannelProxy(&listener, thread.message_loop_proxy().get());
   listener.Init(sender());
 
   ASSERT_TRUE(StartClient());
 
-  Send(sender(), "hello from parent");
+  IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
 
   // Run message loop.
   base::MessageLoop::current()->Run();
@@ -209,7 +143,7 @@
   thread.Stop();
 }
 
-class ChannelListenerWithOnConnectedSend : public GenericChannelListener {
+class ChannelListenerWithOnConnectedSend : public IPC::TestChannelListener {
  public:
   ChannelListenerWithOnConnectedSend() {}
   virtual ~ChannelListenerWithOnConnectedSend() {}
@@ -237,7 +171,7 @@
   ASSERT_TRUE(ConnectChannel());
   ASSERT_TRUE(StartClient());
 
-  Send(sender(), "hello from parent");
+  IPC::TestChannelListener::SendOneMessage(sender(), "hello from parent");
 
   // Run message loop.
   base::MessageLoop::current()->Run();
@@ -251,7 +185,7 @@
 
 MULTIPROCESS_IPC_TEST_CLIENT_MAIN(GenericClient) {
   base::MessageLoopForIO main_message_loop;
-  GenericChannelListener listener;
+  IPC::TestChannelListener listener;
 
   // Set up IPC channel.
   scoped_ptr<IPC::Channel> channel(IPC::Channel::CreateClient(
@@ -259,7 +193,7 @@
       &listener));
   CHECK(channel->Connect());
   listener.Init(channel.get());
-  Send(channel.get(), "hello from child");
+  IPC::TestChannelListener::SendOneMessage(channel.get(), "hello from child");
 
   base::MessageLoop::current()->Run();
   return 0;
diff --git a/ipc/ipc_export.h b/ipc/ipc_export.h
index 776b3ee..9f8411c 100644
--- a/ipc/ipc_export.h
+++ b/ipc/ipc_export.h
@@ -17,16 +17,31 @@
 #define IPC_EXPORT __declspec(dllimport)
 #endif  // defined(IPC_IMPLEMENTATION)
 
+#if defined(IPC_MOJO_IMPLEMENTATION)
+#define IPC_MOJO_EXPORT __declspec(dllexport)
+#else
+#define IPC_MOJO_EXPORT __declspec(dllimport)
+#endif  // defined(IPC_MOJO_IMPLEMENTATION)
+
 #else  // defined(WIN32)
+
 #if defined(IPC_IMPLEMENTATION)
 #define IPC_EXPORT __attribute__((visibility("default")))
 #else
 #define IPC_EXPORT
 #endif
+
+#if defined(IPC_MOJO_IMPLEMENTATION)
+#define IPC_MOJO_EXPORT __attribute__((visibility("default")))
+#else
+#define IPC_MOJO_EXPORT
+#endif
+
 #endif
 
 #else  // defined(COMPONENT_BUILD)
 #define IPC_EXPORT
+#define IPC_MOJO_EXPORT
 #endif
 
 #endif  // IPC_IPC_EXPORT_H_
diff --git a/ipc/ipc_message.h b/ipc/ipc_message.h
index fc37d72..198e6c0 100644
--- a/ipc/ipc_message.h
+++ b/ipc/ipc_message.h
@@ -222,6 +222,7 @@
 
  protected:
   friend class Channel;
+  friend class ChannelMojo;
   friend class ChannelNacl;
   friend class ChannelPosix;
   friend class ChannelWin;
diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc
index a7ed230..0c4702c 100644
--- a/ipc/ipc_sync_channel.cc
+++ b/ipc/ipc_sync_channel.cc
@@ -13,6 +13,7 @@
 #include "base/synchronization/waitable_event_watcher.h"
 #include "base/thread_task_runner_handle.h"
 #include "base/threading/thread_local.h"
+#include "ipc/ipc_channel_factory.h"
 #include "ipc/ipc_logging.h"
 #include "ipc/ipc_message_macros.h"
 #include "ipc/ipc_sync_message.h"
@@ -420,6 +421,19 @@
 
 // static
 scoped_ptr<SyncChannel> SyncChannel::Create(
+    scoped_ptr<ChannelFactory> factory,
+    Listener* listener,
+    base::SingleThreadTaskRunner* ipc_task_runner,
+    bool create_pipe_now,
+    base::WaitableEvent* shutdown_event) {
+  scoped_ptr<SyncChannel> channel =
+      Create(listener, ipc_task_runner, shutdown_event);
+  channel->Init(factory.Pass(), create_pipe_now);
+  return channel.Pass();
+}
+
+// static
+scoped_ptr<SyncChannel> SyncChannel::Create(
     Listener* listener,
     base::SingleThreadTaskRunner* ipc_task_runner,
     WaitableEvent* shutdown_event) {
diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h
index 8984184..f8207ce 100644
--- a/ipc/ipc_sync_channel.h
+++ b/ipc/ipc_sync_channel.h
@@ -23,6 +23,7 @@
 namespace IPC {
 
 class SyncMessage;
+class ChannelFactory;
 
 // This is similar to ChannelProxy, with the added feature of supporting sending
 // synchronous messages.
@@ -75,6 +76,13 @@
       bool create_pipe_now,
       base::WaitableEvent* shutdown_event);
 
+  static scoped_ptr<SyncChannel> Create(
+      scoped_ptr<ChannelFactory> factory,
+      Listener* listener,
+      base::SingleThreadTaskRunner* ipc_task_runner,
+      bool create_pipe_now,
+      base::WaitableEvent* shutdown_event);
+
   // Creates an uninitialized sync channel. Call ChannelProxy::Init to
   // initialize the channel. This two-step setup allows message filters to be
   // added before any messages are sent or received.
diff --git a/ipc/ipc_test_base.cc b/ipc/ipc_test_base.cc
index 589ee98..f893c28 100644
--- a/ipc/ipc_test_base.cc
+++ b/ipc/ipc_test_base.cc
@@ -59,6 +59,15 @@
   return channel_->Connect();
 }
 
+scoped_ptr<IPC::Channel> IPCTestBase::ReleaseChannel() {
+  return channel_.Pass();
+}
+
+void IPCTestBase::SetChannel(scoped_ptr<IPC::Channel> channel) {
+  channel_ = channel.Pass();
+}
+
+
 void IPCTestBase::DestroyChannel() {
   DCHECK(channel_.get());
   channel_.reset();
@@ -120,3 +129,7 @@
   client_process_ = base::kNullProcessHandle;
   return rv;
 }
+
+scoped_refptr<base::TaskRunner> IPCTestBase::io_thread_task_runner() {
+  return message_loop_->message_loop_proxy();
+}
diff --git a/ipc/ipc_test_base.h b/ipc/ipc_test_base.h
index 5bd3e96..ce3328a 100644
--- a/ipc/ipc_test_base.h
+++ b/ipc/ipc_test_base.h
@@ -47,6 +47,11 @@
   bool ConnectChannel();
   void DestroyChannel();
 
+  // Releases or replaces existing channel.
+  // These are useful for testing specific types of channel subclasses.
+  scoped_ptr<IPC::Channel> ReleaseChannel();
+  void SetChannel(scoped_ptr<IPC::Channel> channel);
+
   // Use this instead of CreateChannel() if you want to use some different
   // channel specification (then use ConnectChannel() as usual).
   void CreateChannelFromChannelHandle(const IPC::ChannelHandle& channel_handle,
@@ -81,6 +86,7 @@
   IPC::ChannelProxy* channel_proxy() { return channel_proxy_.get(); }
 
   const base::ProcessHandle& client_process() const { return client_process_; }
+  scoped_refptr<base::TaskRunner> io_thread_task_runner();
 
  private:
   std::string test_client_name_;
diff --git a/ipc/ipc_test_channel_listener.cc b/ipc/ipc_test_channel_listener.cc
new file mode 100644
index 0000000..e98f6b7
--- /dev/null
+++ b/ipc/ipc_test_channel_listener.cc
@@ -0,0 +1,62 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_test_channel_listener.h"
+
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_sender.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace IPC {
+
+// static
+void TestChannelListener::SendOneMessage(IPC::Sender* sender,
+                                         const char* text) {
+  static int message_index = 0;
+
+  IPC::Message* message = new IPC::Message(0,
+                                           2,
+                                           IPC::Message::PRIORITY_NORMAL);
+  message->WriteInt(message_index++);
+  message->WriteString(std::string(text));
+
+  // Make sure we can handle large messages.
+  char junk[kLongMessageStringNumBytes];
+  memset(junk, 'a', sizeof(junk)-1);
+  junk[sizeof(junk)-1] = 0;
+  message->WriteString(std::string(junk));
+
+  sender->Send(message);
+}
+
+
+bool TestChannelListener::OnMessageReceived(const IPC::Message& message) {
+  PickleIterator iter(message);
+
+  int ignored;
+  EXPECT_TRUE(iter.ReadInt(&ignored));
+  std::string data;
+  EXPECT_TRUE(iter.ReadString(&data));
+  std::string big_string;
+  EXPECT_TRUE(iter.ReadString(&big_string));
+  EXPECT_EQ(kLongMessageStringNumBytes - 1, big_string.length());
+
+  SendNextMessage();
+  return true;
+}
+
+void TestChannelListener::OnChannelError() {
+  // There is a race when closing the channel so the last message may be lost.
+  EXPECT_LE(messages_left_, 1);
+  base::MessageLoop::current()->Quit();
+}
+
+void TestChannelListener::SendNextMessage() {
+  if (--messages_left_ <= 0)
+    base::MessageLoop::current()->Quit();
+  else
+    SendOneMessage(sender_, "Foo");
+}
+
+}
diff --git a/ipc/ipc_test_channel_listener.h b/ipc/ipc_test_channel_listener.h
new file mode 100644
index 0000000..047e15d
--- /dev/null
+++ b/ipc/ipc_test_channel_listener.h
@@ -0,0 +1,44 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_TEST_CHANNEL_LISTENER_H_
+#define IPC_IPC_TEST_CHANNEL_LISTENER_H_
+
+#include "ipc/ipc_listener.h"
+
+namespace IPC {
+
+class Sender;
+
+// A generic listener that expects messages of a certain type (see
+// OnMessageReceived()), and either sends a generic response or quits after the
+// 50th message (or on channel error).
+class TestChannelListener : public Listener {
+ public:
+  static const size_t kLongMessageStringNumBytes = 50000;
+  static void SendOneMessage(Sender* sender, const char* text);
+
+  TestChannelListener() : sender_(NULL), messages_left_(50) {}
+  virtual ~TestChannelListener() {}
+
+  virtual bool OnMessageReceived(const Message& message) OVERRIDE;
+  virtual void OnChannelError() OVERRIDE;
+
+  void Init(Sender* s) {
+    sender_ = s;
+  }
+
+  bool HasSentAll() const { return 0 == messages_left_; }
+
+ protected:
+  void SendNextMessage();
+
+ private:
+  Sender* sender_;
+  int messages_left_;
+};
+
+}
+
+#endif // IPC_IPC_TEST_CHANNEL_LISTENER_H_
diff --git a/ipc/ipc_test_sink.cc b/ipc/ipc_test_sink.cc
index 9e9d1fd..b1a21bf 100644
--- a/ipc/ipc_test_sink.cc
+++ b/ipc/ipc_test_sink.cc
@@ -35,6 +35,15 @@
   return base::ProcessId();
 }
 
+base::ProcessId TestSink::GetSelfPID() const {
+  NOTIMPLEMENTED();
+  return base::ProcessId();
+}
+
+ChannelHandle TestSink::TakePipeHandle() {
+  NOTIMPLEMENTED();
+  return ChannelHandle();
+}
 
 bool TestSink::OnMessageReceived(const Message& msg) {
   ObserverListBase<Listener>::Iterator it(filter_list_);
diff --git a/ipc/ipc_test_sink.h b/ipc/ipc_test_sink.h
index 1a213ee..54bab34 100644
--- a/ipc/ipc_test_sink.h
+++ b/ipc/ipc_test_sink.h
@@ -81,6 +81,8 @@
   virtual bool Connect() OVERRIDE WARN_UNUSED_RESULT;
   virtual void Close() OVERRIDE;
   virtual base::ProcessId GetPeerPID() const OVERRIDE;
+  virtual base::ProcessId GetSelfPID() const OVERRIDE;
+  virtual ChannelHandle TakePipeHandle() OVERRIDE;
 
 #if defined(OS_POSIX) && !defined(OS_NACL)
   virtual int GetClientFileDescriptor() const OVERRIDE;
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
new file mode 100644
index 0000000..27d35b7
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -0,0 +1,596 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/lazy_instance.h"
+#include "ipc/ipc_listener.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+#include "ipc/file_descriptor_set_posix.h"
+#endif
+
+namespace IPC {
+
+namespace {
+
+// IPC::Listener for bootstrap channels.
+// It should never receive any message.
+class NullListener : public Listener {
+ public:
+  virtual bool OnMessageReceived(const Message&) OVERRIDE {
+    NOTREACHED();
+    return false;
+  }
+
+  virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+    NOTREACHED();
+  }
+
+  virtual void OnChannelError() OVERRIDE {
+    NOTREACHED();
+  }
+
+  virtual void OnBadMessageReceived(const Message& message) OVERRIDE {
+    NOTREACHED();
+  }
+};
+
+base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER;
+
+class MojoChannelFactory : public ChannelFactory {
+ public:
+  MojoChannelFactory(
+      ChannelHandle channel_handle,
+      Channel::Mode mode,
+      scoped_refptr<base::TaskRunner> io_thread_task_runner)
+      : channel_handle_(channel_handle),
+        mode_(mode),
+        io_thread_task_runner_(io_thread_task_runner) {
+  }
+
+  virtual std::string GetName() const OVERRIDE {
+    return channel_handle_.name;
+  }
+
+  virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE {
+    return ChannelMojo::Create(
+        channel_handle_,
+        mode_,
+        listener,
+        io_thread_task_runner_).PassAs<Channel>();
+  }
+
+ private:
+  ChannelHandle channel_handle_;
+  Channel::Mode mode_;
+  scoped_refptr<base::TaskRunner> io_thread_task_runner_;
+};
+
+mojo::embedder::PlatformHandle ToPlatformHandle(
+    const ChannelHandle& handle) {
+#if defined(OS_POSIX) && !defined(OS_NACL)
+  return mojo::embedder::PlatformHandle(handle.socket.fd);
+#elif defined(OS_WIN)
+  return mojo::embedder::PlatformHandle(handle.pipe.handle);
+#else
+#error "Unsupported Platform!"
+#endif
+}
+
+//------------------------------------------------------------------------------
+
+// TODO(morrita): This should be built using higher-level Mojo construct
+// for clarity and extensibility.
+class HelloMessage {
+ public:
+  static Pickle CreateRequest(int32 pid) {
+    Pickle request;
+    request.WriteString(kHelloRequestMagic);
+    request.WriteInt(pid);
+    return request;
+  }
+
+  static bool ReadRequest(Pickle& pickle, int32* pid) {
+    PickleIterator iter(pickle);
+    std::string hello;
+    if (!iter.ReadString(&hello)) {
+      DLOG(WARNING) << "Failed to Read magic string.";
+      return false;
+    }
+
+    if (hello != kHelloRequestMagic) {
+      DLOG(WARNING) << "Magic mismatch:" << hello;
+      return false;
+    }
+
+    int read_pid;
+    if (!iter.ReadInt(&read_pid)) {
+      DLOG(WARNING) << "Failed to Read PID.";
+      return false;
+    }
+
+    *pid = read_pid;
+    return true;
+  }
+
+  static Pickle CreateResponse(int32 pid) {
+    Pickle request;
+    request.WriteString(kHelloResponseMagic);
+    request.WriteInt(pid);
+    return request;
+  }
+
+  static bool ReadResponse(Pickle& pickle, int32* pid) {
+    PickleIterator iter(pickle);
+    std::string hello;
+    if (!iter.ReadString(&hello)) {
+      DLOG(WARNING) << "Failed to read magic string.";
+      return false;
+    }
+
+    if (hello != kHelloResponseMagic) {
+      DLOG(WARNING) << "Magic mismatch:" << hello;
+      return false;
+    }
+
+    int read_pid;
+    if (!iter.ReadInt(&read_pid)) {
+      DLOG(WARNING) << "Failed to read PID.";
+      return false;
+    }
+
+    *pid = read_pid;
+    return true;
+  }
+
+ private:
+  static const char* kHelloRequestMagic;
+  static const char* kHelloResponseMagic;
+};
+
+const char* HelloMessage::kHelloRequestMagic = "MREQ";
+const char* HelloMessage::kHelloResponseMagic = "MRES";
+
+} // namespace
+
+//------------------------------------------------------------------------------
+
+// A MessagePipeReader implemenation for IPC::Message communication.
+class ChannelMojo::MessageReader : public internal::MessagePipeReader {
+ public:
+  MessageReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+      : internal::MessagePipeReader(pipe.Pass()),
+        owner_(owner) {}
+
+  bool Send(scoped_ptr<Message> message);
+  virtual void OnMessageReceived() OVERRIDE;
+  virtual void OnPipeClosed() OVERRIDE;
+  virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ private:
+  ChannelMojo* owner_;
+};
+
+void ChannelMojo::MessageReader::OnMessageReceived() {
+  Message message(data_buffer().empty() ? "" : &data_buffer()[0],
+                  static_cast<uint32>(data_buffer().size()));
+
+  std::vector<MojoHandle> handle_buffer;
+  TakeHandleBuffer(&handle_buffer);
+#if defined(OS_POSIX) && !defined(OS_NACL)
+  for (size_t i = 0; i < handle_buffer.size(); ++i) {
+    mojo::embedder::ScopedPlatformHandle platform_handle;
+    MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle(
+        handle_buffer[i], &platform_handle);
+    if (unwrap_result != MOJO_RESULT_OK) {
+      DLOG(WARNING) << "Pipe failed to covert handles. Closing: "
+                    << unwrap_result;
+      CloseWithError(unwrap_result);
+      return;
+    }
+
+    bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd);
+    DCHECK(ok);
+  }
+#else
+  DCHECK(handle_buffer.empty());
+#endif
+
+  message.TraceMessageEnd();
+  owner_->OnMessageReceived(message);
+}
+
+void ChannelMojo::MessageReader::OnPipeClosed() {
+  if (!owner_)
+    return;
+  owner_->OnPipeClosed(this);
+  owner_ = NULL;
+}
+
+void ChannelMojo::MessageReader::OnPipeError(MojoResult error) {
+  if (!owner_)
+    return;
+  owner_->OnPipeError(this);
+}
+
+bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) {
+  DCHECK(IsValid());
+
+  message->TraceMessageBegin();
+  std::vector<MojoHandle> handles;
+#if defined(OS_POSIX) && !defined(OS_NACL)
+  if (message->HasFileDescriptors()) {
+    FileDescriptorSet* fdset = message->file_descriptor_set();
+    for (size_t i = 0; i < fdset->size(); ++i) {
+      MojoHandle wrapped_handle;
+      MojoResult wrap_result = CreatePlatformHandleWrapper(
+          mojo::embedder::ScopedPlatformHandle(
+              mojo::embedder::PlatformHandle(
+                  fdset->GetDescriptorAt(i))),
+          &wrapped_handle);
+      if (MOJO_RESULT_OK != wrap_result) {
+        DLOG(WARNING) << "Pipe failed to wrap handles. Closing: "
+                      << wrap_result;
+        CloseWithError(wrap_result);
+        return false;
+      }
+
+      handles.push_back(wrapped_handle);
+    }
+  }
+#endif
+  MojoResult write_result = MojoWriteMessage(
+      handle(),
+      message->data(), static_cast<uint32>(message->size()),
+      handles.empty() ? NULL : &handles[0],
+      static_cast<uint32>(handles.size()),
+      MOJO_WRITE_MESSAGE_FLAG_NONE);
+  if (MOJO_RESULT_OK != write_result) {
+    CloseWithError(write_result);
+    return false;
+  }
+
+  return true;
+}
+
+//------------------------------------------------------------------------------
+
+// MessagePipeReader implemenation for control messages.
+// Actual message handling is implemented by sublcasses.
+class ChannelMojo::ControlReader : public internal::MessagePipeReader {
+ public:
+  ControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+      : internal::MessagePipeReader(pipe.Pass()),
+        owner_(owner) {}
+
+  virtual bool Connect() { return true; }
+  virtual void OnPipeClosed() OVERRIDE;
+  virtual void OnPipeError(MojoResult error) OVERRIDE;
+
+ protected:
+  ChannelMojo* owner_;
+};
+
+void ChannelMojo::ControlReader::OnPipeClosed() {
+  if (!owner_)
+    return;
+  owner_->OnPipeClosed(this);
+  owner_ = NULL;
+}
+
+void ChannelMojo::ControlReader::OnPipeError(MojoResult error) {
+  if (!owner_)
+    return;
+  owner_->OnPipeError(this);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for server-side ChannelMojo.
+class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader {
+ public:
+  ServerControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+      : ControlReader(pipe.Pass(), owner) { }
+
+  virtual bool Connect() OVERRIDE;
+  virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+  MojoResult SendHelloRequest();
+  MojoResult RespondHelloResponse();
+
+  mojo::ScopedMessagePipeHandle message_pipe_;
+};
+
+bool ChannelMojo::ServerControlReader::Connect() {
+  MojoResult result = SendHelloRequest();
+  if (result != MOJO_RESULT_OK) {
+    CloseWithError(result);
+    return false;
+  }
+
+  return true;
+}
+
+MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() {
+  DCHECK(IsValid());
+  DCHECK(!message_pipe_.is_valid());
+
+  mojo::ScopedMessagePipeHandle self;
+  mojo::ScopedMessagePipeHandle peer;
+  MojoResult create_result = mojo::CreateMessagePipe(
+      NULL, &message_pipe_, &peer);
+  if (MOJO_RESULT_OK != create_result) {
+    DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
+    return create_result;
+  }
+
+  MojoHandle peer_to_send = peer.get().value();
+  Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID());
+  MojoResult write_result = MojoWriteMessage(
+      handle(),
+      request.data(), static_cast<uint32>(request.size()),
+      &peer_to_send, 1,
+      MOJO_WRITE_MESSAGE_FLAG_NONE);
+  if (MOJO_RESULT_OK != write_result) {
+    DLOG(WARNING) << "Writing Hello request failed: " << create_result;
+    return write_result;
+  }
+
+  // |peer| is sent and no longer owned by |this|.
+  (void)peer.release();
+  return MOJO_RESULT_OK;
+}
+
+MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() {
+  Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+                 static_cast<uint32>(data_buffer().size()));
+
+  int32 read_pid = 0;
+  if (!HelloMessage::ReadResponse(request, &read_pid)) {
+    DLOG(ERROR) << "Failed to parse Hello response.";
+    return MOJO_RESULT_UNKNOWN;
+  }
+
+  base::ProcessId pid = static_cast<base::ProcessId>(read_pid);
+  owner_->set_peer_pid(pid);
+  owner_->OnConnected(message_pipe_.Pass());
+  return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ServerControlReader::OnMessageReceived() {
+  MojoResult result = RespondHelloResponse();
+  if (result != MOJO_RESULT_OK)
+    CloseWithError(result);
+}
+
+//------------------------------------------------------------------------------
+
+// ControlReader for client-side ChannelMojo.
+class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader {
+ public:
+  ClientControlReader(mojo::ScopedMessagePipeHandle pipe, ChannelMojo* owner)
+      : ControlReader(pipe.Pass(), owner) {}
+
+  virtual void OnMessageReceived() OVERRIDE;
+
+ private:
+  MojoResult RespondHelloRequest(MojoHandle message_channel);
+};
+
+MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest(
+    MojoHandle message_channel) {
+  DCHECK(IsValid());
+
+  mojo::ScopedMessagePipeHandle received_pipe(
+      (mojo::MessagePipeHandle(message_channel)));
+
+  int32 read_request = 0;
+  Pickle request(data_buffer().empty() ? "" : &data_buffer()[0],
+                 static_cast<uint32>(data_buffer().size()));
+  if (!HelloMessage::ReadRequest(request, &read_request)) {
+    DLOG(ERROR) << "Hello request has wrong magic.";
+    return MOJO_RESULT_UNKNOWN;
+  }
+
+  base::ProcessId pid = read_request;
+  Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID());
+  MojoResult write_result = MojoWriteMessage(
+      handle(),
+      response.data(), static_cast<uint32>(response.size()),
+      NULL, 0,
+      MOJO_WRITE_MESSAGE_FLAG_NONE);
+  if (MOJO_RESULT_OK != write_result) {
+    DLOG(ERROR) << "Writing Hello response failed: " << write_result;
+    return write_result;
+  }
+
+  owner_->set_peer_pid(pid);
+  owner_->OnConnected(received_pipe.Pass());
+  return MOJO_RESULT_OK;
+}
+
+void ChannelMojo::ClientControlReader::OnMessageReceived() {
+  std::vector<MojoHandle> handle_buffer;
+  TakeHandleBuffer(&handle_buffer);
+  if (handle_buffer.size() != 1) {
+    DLOG(ERROR) << "Hello request doesn't contains required handle: "
+                << handle_buffer.size();
+    CloseWithError(MOJO_RESULT_UNKNOWN);
+    return;
+  }
+
+  MojoResult result = RespondHelloRequest(handle_buffer[0]);
+  if (result != MOJO_RESULT_OK) {
+    DLOG(ERROR) << "Failed to respond Hello request. Closing: "
+                << result;
+    CloseWithError(result);
+  }
+}
+
+//------------------------------------------------------------------------------
+
+void ChannelMojo::ChannelInfoDeleter::operator()(
+    mojo::embedder::ChannelInfo* ptr) const {
+  mojo::embedder::DestroyChannelOnIOThread(ptr);
+}
+
+//------------------------------------------------------------------------------
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+    scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+    scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+  return make_scoped_ptr(new ChannelMojo(
+      bootstrap.Pass(), mode, listener, io_thread_task_runner));
+}
+
+// static
+scoped_ptr<ChannelMojo> ChannelMojo::Create(
+    const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+    scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+  return Create(
+      Channel::Create(channel_handle, mode, g_null_listener.Pointer()),
+      mode, listener, io_thread_task_runner);
+}
+
+// static
+scoped_ptr<ChannelFactory> ChannelMojo::CreateFactory(
+    const ChannelHandle &channel_handle, Mode mode,
+    scoped_refptr<base::TaskRunner> io_thread_task_runner) {
+  return make_scoped_ptr(
+      new MojoChannelFactory(
+          channel_handle, mode,
+          io_thread_task_runner)).PassAs<ChannelFactory>();
+}
+
+ChannelMojo::ChannelMojo(
+    scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+    scoped_refptr<base::TaskRunner> io_thread_task_runner)
+    : weak_factory_(this),
+      bootstrap_(bootstrap.Pass()),
+      mode_(mode), listener_(listener),
+      peer_pid_(base::kNullProcessId) {
+  DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT);
+  mojo::ScopedMessagePipeHandle control_pipe
+      = mojo::embedder::CreateChannel(
+          mojo::embedder::ScopedPlatformHandle(
+              ToPlatformHandle(bootstrap_->TakePipeHandle())),
+          io_thread_task_runner,
+          base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)),
+          io_thread_task_runner);
+
+  // MessagePipeReader, that is crated in InitOnIOThread(), should live only in
+  // IO thread, but IPC::Channel can be instantiated outside of it.
+  // So we move the creation to the appropriate thread.
+  if (base::MessageLoopProxy::current() == io_thread_task_runner) {
+    InitOnIOThread(control_pipe.Pass());
+  } else {
+    io_thread_task_runner->PostTask(
+        FROM_HERE,
+        base::Bind(&ChannelMojo::InitOnIOThread,
+                   weak_factory_.GetWeakPtr(),
+                   base::Passed(control_pipe.Pass())));
+  }
+}
+
+ChannelMojo::~ChannelMojo() {
+  Close();
+}
+
+void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) {
+  control_reader_ = CreateControlReader(control_pipe.Pass());
+}
+
+scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader(
+    mojo::ScopedMessagePipeHandle pipe) {
+  if (MODE_SERVER == mode_) {
+    return make_scoped_ptr(
+        new ServerControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+  }
+
+  DCHECK(mode_ == MODE_CLIENT);
+  return make_scoped_ptr(
+      new ClientControlReader(pipe.Pass(), this)).PassAs<ControlReader>();
+}
+
+bool ChannelMojo::Connect() {
+  DCHECK(!message_reader_);
+  return control_reader_->Connect();
+}
+
+void ChannelMojo::Close() {
+  control_reader_.reset();
+  message_reader_.reset();
+  channel_info_.reset();
+}
+
+void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) {
+  message_reader_ = make_scoped_ptr(new MessageReader(pipe.Pass(), this));
+
+  for (size_t i = 0; i < pending_messages_.size(); ++i) {
+    message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
+    pending_messages_[i] = NULL;
+  }
+
+  pending_messages_.clear();
+
+  listener_->OnChannelConnected(GetPeerPID());
+}
+
+void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
+  Close();
+}
+
+void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
+  listener_->OnChannelError();
+}
+
+
+bool ChannelMojo::Send(Message* message) {
+  if (!message_reader_) {
+    pending_messages_.push_back(message);
+    return true;
+  }
+
+  return message_reader_->Send(make_scoped_ptr(message));
+}
+
+base::ProcessId ChannelMojo::GetPeerPID() const {
+  return peer_pid_;
+}
+
+base::ProcessId ChannelMojo::GetSelfPID() const {
+  return bootstrap_->GetSelfPID();
+}
+
+ChannelHandle ChannelMojo::TakePipeHandle() {
+  return bootstrap_->TakePipeHandle();
+}
+
+void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo* info) {
+  channel_info_.reset(info);
+}
+
+void ChannelMojo::OnMessageReceived(Message& message) {
+  listener_->OnMessageReceived(message);
+  if (message.dispatch_error())
+    listener_->OnBadMessageReceived(message);
+}
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+int ChannelMojo::GetClientFileDescriptor() const {
+  return bootstrap_->GetClientFileDescriptor();
+}
+
+int ChannelMojo::TakeClientFileDescriptor() {
+  return bootstrap_->TakeClientFileDescriptor();
+}
+#endif  // defined(OS_POSIX) && !defined(OS_NACL)
+
+}  // namespace IPC
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
new file mode 100644
index 0000000..b00abc9
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -0,0 +1,131 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_MOJO_H_
+#define IPC_IPC_CHANNEL_MOJO_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
+#include "base/memory/weak_ptr.h"
+#include "ipc/ipc_channel.h"
+#include "ipc/ipc_channel_factory.h"
+#include "ipc/ipc_export.h"
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace mojo {
+namespace embedder {
+struct ChannelInfo;
+}
+}
+
+namespace IPC {
+
+// Mojo-based IPC::Channel implementation over a platform handle.
+//
+// ChannelMojo builds Mojo MessagePipe using underlying pipe given by
+// "bootstrap" IPC::Channel which creates and owns platform pipe like
+// named socket. The bootstrap Channel is used only for establishing
+// the underlying connection. ChannelMojo takes its handle over once
+// the it is made and puts MessagePipe on it.
+//
+// ChannelMojo has a couple of MessagePipes:
+//
+// * The first MessagePipe, which is built on top of bootstrap handle,
+//   is the "control" pipe. It is used to communicate out-of-band
+//   control messages that aren't visible from IPC::Listener.
+//
+// * The second MessagePipe, which is created by the server channel
+//   and sent to client Channel over the control pipe, is used
+//   to send IPC::Messages as an IPC::Sender.
+//
+// TODO(morrita): Extract handle creation part of IPC::Channel into
+//                separate class to clarify what ChannelMojo relies
+//                on.
+// TODO(morrita): Add APIs to create extra MessagePipes to let
+//                Mojo-based objects talk over this Channel.
+//
+class IPC_MOJO_EXPORT ChannelMojo : public Channel {
+ public:
+  // Create ChannelMojo on top of given |bootstrap| channel.
+  static scoped_ptr<ChannelMojo> Create(
+      scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+      scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+  // Create ChannelMojo. A bootstrap channel is created as well.
+  static scoped_ptr<ChannelMojo> Create(
+      const ChannelHandle &channel_handle, Mode mode, Listener* listener,
+      scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+  // Create a factory object for ChannelMojo.
+  // The factory is used to create Mojo-based ChannelProxy family.
+  static scoped_ptr<ChannelFactory> CreateFactory(
+      const ChannelHandle &channel_handle, Mode mode,
+      scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+  virtual ~ChannelMojo();
+
+  // Channel implementation
+  virtual bool Connect() OVERRIDE;
+  virtual void Close() OVERRIDE;
+  virtual bool Send(Message* message) OVERRIDE;
+  virtual base::ProcessId GetPeerPID() const OVERRIDE;
+  virtual base::ProcessId GetSelfPID() const OVERRIDE;
+  virtual ChannelHandle TakePipeHandle() OVERRIDE;
+
+#if defined(OS_POSIX) && !defined(OS_NACL)
+  virtual int GetClientFileDescriptor() const OVERRIDE;
+  virtual int TakeClientFileDescriptor() OVERRIDE;
+#endif  // defined(OS_POSIX) && !defined(OS_NACL)
+
+  // Called from MessagePipeReader implementations
+  void OnMessageReceived(Message& message);
+  void OnConnected(mojo::ScopedMessagePipeHandle pipe);
+  void OnPipeClosed(internal::MessagePipeReader* reader);
+  void OnPipeError(internal::MessagePipeReader* reader);
+  void set_peer_pid(base::ProcessId pid) { peer_pid_ = pid; }
+
+ private:
+  struct ChannelInfoDeleter {
+    void operator()(mojo::embedder::ChannelInfo* ptr) const;
+  };
+
+  // ChannelMojo needs to kill its MessagePipeReader in delayed manner
+  // because the channel wants to kill these readers during the
+  // notifications invoked by them.
+  typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
+
+  class ControlReader;
+  class ServerControlReader;
+  class ClientControlReader;
+  class MessageReader;
+
+  ChannelMojo(scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
+              scoped_refptr<base::TaskRunner> io_thread_task_runner);
+
+  void InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe);
+  scoped_ptr<ControlReader> CreateControlReader(
+      mojo::ScopedMessagePipeHandle pipe);
+  void DidCreateChannel(mojo::embedder::ChannelInfo*);
+
+  base::WeakPtrFactory<ChannelMojo> weak_factory_;
+  scoped_ptr<Channel> bootstrap_;
+  Mode mode_;
+  Listener* listener_;
+  base::ProcessId peer_pid_;
+  scoped_ptr<mojo::embedder::ChannelInfo,
+             ChannelInfoDeleter> channel_info_;
+
+  scoped_ptr<ControlReader, ReaderDeleter> control_reader_;
+  scoped_ptr<MessageReader, ReaderDeleter> message_reader_;
+  ScopedVector<Message> pending_messages_;
+
+  DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
+};
+
+}  // namespace IPC
+
+#endif  // IPC_IPC_CHANNEL_MOJO_H_
diff --git a/ipc/mojo/ipc_channel_mojo_unittest.cc b/ipc/mojo/ipc_channel_mojo_unittest.cc
new file mode 100644
index 0000000..915029d
--- /dev/null
+++ b/ipc/mojo/ipc_channel_mojo_unittest.cc
@@ -0,0 +1,260 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_channel_mojo.h"
+
+#include "base/base_paths.h"
+#include "base/files/file.h"
+#include "base/message_loop/message_loop.h"
+#include "base/path_service.h"
+#include "base/pickle.h"
+#include "base/threading/thread.h"
+#include "ipc/ipc_message.h"
+#include "ipc/ipc_test_base.h"
+#include "ipc/ipc_test_channel_listener.h"
+
+#if defined(OS_POSIX)
+#include "base/file_descriptor_posix.h"
+#endif
+
+namespace {
+
+class ListenerThatExpectsOK : public IPC::Listener {
+ public:
+  ListenerThatExpectsOK() {}
+
+  virtual ~ListenerThatExpectsOK() {}
+
+  virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+    PickleIterator iter(message);
+    std::string should_be_ok;
+    EXPECT_TRUE(iter.ReadString(&should_be_ok));
+    EXPECT_EQ(should_be_ok, "OK");
+    base::MessageLoop::current()->Quit();
+    return true;
+  }
+
+  virtual void OnChannelError() OVERRIDE {
+    NOTREACHED();
+  }
+
+  static void SendOK(IPC::Sender* sender) {
+    IPC::Message* message = new IPC::Message(
+        0, 2, IPC::Message::PRIORITY_NORMAL);
+    message->WriteString(std::string("OK"));
+    ASSERT_TRUE(sender->Send(message));
+  }
+};
+
+class ListenerThatShouldBeNeverCalled : public IPC::Listener {
+  virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+    NOTREACHED();
+    return true;
+  }
+
+  virtual void OnChannelError() OVERRIDE {
+    NOTREACHED();
+  }
+
+  virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+    NOTREACHED();
+  }
+
+  virtual void OnBadMessageReceived(const IPC::Message& message) OVERRIDE {
+    NOTREACHED();
+  }
+};
+
+class ChannelClient {
+ public:
+  explicit ChannelClient(IPC::Listener* listener, const char* name) {
+    scoped_ptr<IPC::Channel> bootstrap(IPC::Channel::CreateClient(
+        IPCTestBase::GetChannelName(name),
+        &never_called_));
+    channel_ = IPC::ChannelMojo::Create(
+        bootstrap.Pass(), IPC::Channel::MODE_CLIENT, listener,
+        main_message_loop_.message_loop_proxy());
+  }
+
+  void Connect() {
+    CHECK(channel_->Connect());
+  }
+
+  IPC::ChannelMojo* channel() const { return channel_.get(); }
+
+ private:
+  scoped_ptr<IPC::ChannelMojo> channel_;
+  ListenerThatShouldBeNeverCalled never_called_;
+  base::MessageLoopForIO main_message_loop_;
+};
+
+class IPCChannelMojoTest : public IPCTestBase {
+ public:
+  void CreateMojoChannel(IPC::Listener* listener);
+
+ protected:
+  virtual void SetUp() OVERRIDE {
+    IPCTestBase::SetUp();
+  }
+
+  ListenerThatShouldBeNeverCalled never_called_;
+};
+
+
+void IPCChannelMojoTest::CreateMojoChannel(IPC::Listener* listener) {
+  CreateChannel(&never_called_);
+  scoped_ptr<IPC::Channel> mojo_channel = IPC::ChannelMojo::Create(
+      ReleaseChannel(), IPC::Channel::MODE_SERVER, listener,
+      io_thread_task_runner()).PassAs<IPC::Channel>();
+  SetChannel(mojo_channel.PassAs<IPC::Channel>());
+}
+
+class TestChannelListenerWithExtraExpectations
+    : public IPC::TestChannelListener {
+ public:
+  TestChannelListenerWithExtraExpectations()
+      : is_connected_called_(false) {
+  }
+
+  virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
+    IPC::TestChannelListener::OnChannelConnected(peer_pid);
+    EXPECT_TRUE(base::kNullProcessId != peer_pid);
+    is_connected_called_ = true;
+  }
+
+  bool is_connected_called() const { return is_connected_called_; }
+
+ private:
+  bool is_connected_called_;
+};
+
+TEST_F(IPCChannelMojoTest, ConnectedFromClient) {
+  Init("IPCChannelMojoTestClient");
+
+  // Set up IPC channel and start client.
+  TestChannelListenerWithExtraExpectations listener;
+  CreateMojoChannel(&listener);
+  listener.Init(sender());
+  ASSERT_TRUE(ConnectChannel());
+  ASSERT_TRUE(StartClient());
+
+  IPC::TestChannelListener::SendOneMessage(
+      sender(), "hello from parent");
+
+  base::MessageLoop::current()->Run();
+  EXPECT_TRUE(base::kNullProcessId != this->channel()->GetPeerPID());
+
+  this->channel()->Close();
+
+  EXPECT_TRUE(WaitForClientShutdown());
+  EXPECT_TRUE(listener.is_connected_called());
+  EXPECT_TRUE(listener.HasSentAll());
+
+  DestroyChannel();
+}
+
+// A long running process that connects to us
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestClient) {
+  TestChannelListenerWithExtraExpectations listener;
+  ChannelClient client(&listener, "IPCChannelMojoTestClient");
+  client.Connect();
+  listener.Init(client.channel());
+
+  IPC::TestChannelListener::SendOneMessage(
+      client.channel(), "hello from child");
+  base::MessageLoop::current()->Run();
+  EXPECT_TRUE(listener.is_connected_called());
+  EXPECT_TRUE(listener.HasSentAll());
+
+  return 0;
+}
+
+#if defined(OS_POSIX)
+class ListenerThatExpectsFile : public IPC::Listener {
+ public:
+  ListenerThatExpectsFile()
+      : sender_(NULL) {}
+
+  virtual ~ListenerThatExpectsFile() {}
+
+  virtual bool OnMessageReceived(const IPC::Message& message) OVERRIDE {
+    PickleIterator iter(message);
+    base::FileDescriptor desc;
+    EXPECT_TRUE(message.ReadFileDescriptor(&iter, &desc));
+    std::string content(GetSendingFileContent().size(), ' ');
+    base::File file(desc.fd);
+    file.Read(0, &content[0], content.size());
+    EXPECT_EQ(content, GetSendingFileContent());
+    base::MessageLoop::current()->Quit();
+    ListenerThatExpectsOK::SendOK(sender_);
+    return true;
+  }
+
+  virtual void OnChannelError() OVERRIDE {
+    NOTREACHED();
+  }
+
+  static std::string GetSendingFileContent() {
+    return "Hello";
+  }
+
+  static base::FilePath GetSendingFilePath() {
+    base::FilePath path;
+    bool ok = PathService::Get(base::DIR_CACHE, &path);
+    EXPECT_TRUE(ok);
+    return path.Append("ListenerThatExpectsFile.txt");
+  }
+
+  static void WriteAndSendFile(IPC::Sender* sender, base::File& file) {
+    std::string content = GetSendingFileContent();
+    file.WriteAtCurrentPos(content.data(), content.size());
+    file.Flush();
+    IPC::Message* message = new IPC::Message(
+        0, 2, IPC::Message::PRIORITY_NORMAL);
+    message->WriteFileDescriptor(
+        base::FileDescriptor(file.TakePlatformFile(), false));
+    ASSERT_TRUE(sender->Send(message));
+  }
+
+  void set_sender(IPC::Sender* sender) { sender_ = sender; }
+
+ private:
+  IPC::Sender* sender_;
+};
+
+
+TEST_F(IPCChannelMojoTest, SendPlatformHandle) {
+  Init("IPCChannelMojoTestSendPlatformHandleClient");
+
+  ListenerThatExpectsOK listener;
+  CreateMojoChannel(&listener);
+  ASSERT_TRUE(ConnectChannel());
+  ASSERT_TRUE(StartClient());
+
+  base::File file(ListenerThatExpectsFile::GetSendingFilePath(),
+                  base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE |
+                  base::File::FLAG_READ);
+  ListenerThatExpectsFile::WriteAndSendFile(channel(), file);
+  base::MessageLoop::current()->Run();
+
+  this->channel()->Close();
+
+  EXPECT_TRUE(WaitForClientShutdown());
+  DestroyChannel();
+}
+
+MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCChannelMojoTestSendPlatformHandleClient) {
+  ListenerThatExpectsFile listener;
+  ChannelClient client(
+      &listener, "IPCChannelMojoTestSendPlatformHandleClient");
+  client.Connect();
+  listener.set_sender(client.channel());
+
+  base::MessageLoop::current()->Run();
+
+  return 0;
+}
+#endif
+
+}  // namespace
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
new file mode 100644
index 0000000..91022ac
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -0,0 +1,144 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/mojo/ipc_message_pipe_reader.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop_proxy.h"
+#include "mojo/public/cpp/environment/environment.h"
+
+namespace IPC {
+namespace internal {
+
+MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
+    : pipe_wait_id_(0),
+      pipe_(handle.Pass()) {
+  StartWaiting();
+}
+
+MessagePipeReader::~MessagePipeReader() {
+  CHECK(!IsValid());
+}
+
+void MessagePipeReader::Close() {
+  StopWaiting();
+  pipe_.reset();
+  OnPipeClosed();
+}
+
+void MessagePipeReader::CloseWithError(MojoResult error) {
+  OnPipeError(error);
+  Close();
+}
+
+// static
+void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
+  reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
+}
+
+void MessagePipeReader::StartWaiting() {
+  DCHECK(pipe_.is_valid());
+  DCHECK(!pipe_wait_id_);
+  // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
+  // MessagePipe.
+  //
+  // TODO(morrita): Should we re-set the signal when we get new
+  // message to send?
+  pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
+      pipe_.get().value(),
+      MOJO_HANDLE_SIGNAL_READABLE,
+      MOJO_DEADLINE_INDEFINITE,
+      &InvokePipeIsReady,
+      this);
+}
+
+void MessagePipeReader::StopWaiting() {
+  if (!pipe_wait_id_)
+    return;
+  mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
+  pipe_wait_id_ = 0;
+}
+
+void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
+  pipe_wait_id_ = 0;
+
+  if (wait_result != MOJO_RESULT_OK) {
+    // FAILED_PRECONDITION happens when the pipe is
+    // closed before the waiter is scheduled in a backend thread.
+    if (wait_result != MOJO_RESULT_ABORTED &&
+        wait_result != MOJO_RESULT_FAILED_PRECONDITION) {
+      DLOG(WARNING) << "Pipe got error from the waiter. Closing: "
+                    << wait_result;
+      OnPipeError(wait_result);
+    }
+
+    Close();
+    return;
+  }
+
+  while (pipe_.is_valid()) {
+    MojoResult read_result = ReadMessageBytes();
+    if (read_result == MOJO_RESULT_SHOULD_WAIT)
+      break;
+    if (read_result != MOJO_RESULT_OK) {
+      // FAILED_PRECONDITION means that all the received messages
+      // got consumed and the peer is already closed.
+      if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
+        DLOG(WARNING)
+            << "Pipe got error from ReadMessage(). Closing: " << read_result;
+        OnPipeError(read_result);
+      }
+
+      Close();
+      break;
+    }
+
+    OnMessageReceived();
+  }
+
+  if (pipe_.is_valid())
+    StartWaiting();
+}
+
+MojoResult MessagePipeReader::ReadMessageBytes() {
+  DCHECK(handle_buffer_.empty());
+
+  uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
+  uint32_t num_handles = 0;
+  MojoResult result = MojoReadMessage(pipe_.get().value(),
+                                      num_bytes ? &data_buffer_[0] : NULL,
+                                      &num_bytes,
+                                      NULL,
+                                      &num_handles,
+                                      MOJO_READ_MESSAGE_FLAG_NONE);
+  data_buffer_.resize(num_bytes);
+  handle_buffer_.resize(num_handles);
+  if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
+    // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
+    // it needs more bufer. So we re-read it with resized buffers.
+    result = MojoReadMessage(pipe_.get().value(),
+                             num_bytes ? &data_buffer_[0] : NULL,
+                             &num_bytes,
+                             num_handles ? &handle_buffer_[0] : NULL,
+                             &num_handles,
+                             MOJO_READ_MESSAGE_FLAG_NONE);
+  }
+
+  DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
+  DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
+  return result;
+}
+
+void MessagePipeReader::DelayedDeleter::operator()(
+    MessagePipeReader* ptr) const {
+  ptr->Close();
+  base::MessageLoopProxy::current()->PostTask(
+      FROM_HERE, base::Bind(&DeleteNow, ptr));
+}
+
+}  // namespace internal
+}  // namespace IPC
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
new file mode 100644
index 0000000..ecfa018
--- /dev/null
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -0,0 +1,98 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_MESSAGE_PIPE_READER_H_
+#define IPC_IPC_MESSAGE_PIPE_READER_H_
+
+#include <vector>
+
+#include "base/memory/scoped_ptr.h"
+#include "mojo/public/c/environment/async_waiter.h"
+#include "mojo/public/cpp/system/core.h"
+
+namespace IPC {
+namespace internal {
+
+// A helper class to handle bytestream directly over mojo::MessagePipe
+// in template-method pattern. MessagePipeReader manages the lifetime
+// of given MessagePipe and participates the event loop, and
+// read the stream and call the client when it is ready.
+//
+// Each client has to:
+//
+//  * Provide a subclass implemenation of a specific use of a MessagePipe
+//    and implement callbacks.
+//  * Create the subclass instance with a MessagePipeHandle.
+//    The constructor automatically start listening on the pipe.
+//
+// MessageReader has to be used in IO thread. It isn't thread-safe.
+//
+class MessagePipeReader {
+ public:
+  // Delay the object deletion using the current message loop.
+  // This is intended to used by MessagePipeReader owners.
+  class DelayedDeleter {
+   public:
+    typedef base::DefaultDeleter<MessagePipeReader> DefaultType;
+
+    static void DeleteNow(MessagePipeReader* ptr) { delete ptr; }
+
+    DelayedDeleter() {}
+    DelayedDeleter(const DefaultType&) {}
+    DelayedDeleter& operator=(const DefaultType&) { return *this; }
+
+    void operator()(MessagePipeReader* ptr) const;
+  };
+
+  explicit MessagePipeReader(mojo::ScopedMessagePipeHandle handle);
+  virtual ~MessagePipeReader();
+
+  MojoHandle handle() const { return pipe_.get().value(); }
+
+  // Returns received bytes.
+  const std::vector<char>& data_buffer() const {
+    return data_buffer_;
+  }
+
+  // Delegate received handles ownership. The subclass should take the
+  // ownership over in its OnMessageReceived(). They will leak otherwise.
+  void TakeHandleBuffer(std::vector<MojoHandle>* handle_buffer) {
+    handle_buffer_.swap(*handle_buffer);
+  }
+
+  // Close and destroy the MessagePipe.
+  void Close();
+  // Close the mesage pipe with notifying the client with the error.
+  void CloseWithError(MojoResult error);
+  // Return true if the MessagePipe is alive.
+  bool IsValid() { return pipe_.is_valid(); }
+
+  //
+  // The client have to implment these callback to get the readiness
+  // event from the reader
+  //
+  virtual void OnMessageReceived() = 0;
+  virtual void OnPipeClosed() = 0;
+  virtual void OnPipeError(MojoResult error) = 0;
+
+ private:
+  static void InvokePipeIsReady(void* closure, MojoResult result);
+
+  MojoResult ReadMessageBytes();
+  void PipeIsReady(MojoResult wait_result);
+  void StartWaiting();
+  void StopWaiting();
+
+  std::vector<char>  data_buffer_;
+  std::vector<MojoHandle> handle_buffer_;
+  MojoAsyncWaitID pipe_wait_id_;
+  mojo::ScopedMessagePipeHandle pipe_;
+
+  DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
+};
+
+}  // namespace internal
+}  // namespace IPC
+
+#endif  // IPC_IPC_MESSAGE_PIPE_READER_H_
diff --git a/ipc/mojo/run_all_unittests.cc b/ipc/mojo/run_all_unittests.cc
new file mode 100644
index 0000000..1734e5e
--- /dev/null
+++ b/ipc/mojo/run_all_unittests.cc
@@ -0,0 +1,42 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/at_exit.h"
+#include "base/bind.h"
+#include "base/test/launcher/unit_test_launcher.h"
+#include "base/test/test_suite.h"
+#include "mojo/embedder/embedder.h"
+
+#if defined(OS_ANDROID)
+#include "base/android/jni_android.h"
+#include "base/test/test_file_util.h"
+#endif
+
+namespace {
+
+class NoAtExitBaseTestSuite : public base::TestSuite {
+ public:
+  NoAtExitBaseTestSuite(int argc, char** argv)
+      : base::TestSuite(argc, argv, false) {
+  }
+};
+
+int RunTestSuite(int argc, char** argv) {
+  return NoAtExitBaseTestSuite(argc, argv).Run();
+}
+
+}  // namespace
+
+int main(int argc, char** argv) {
+  mojo::embedder::Init();
+#if defined(OS_ANDROID)
+  JNIEnv* env = base::android::AttachCurrentThread();
+  file_util::RegisterContentUriTestUtils(env);
+#else
+  base::AtExitManager at_exit;
+#endif
+  return base::LaunchUnitTestsSerially(argc,
+                                       argv,
+                                       base::Bind(&RunTestSuite, argc, argv));
+}