Support early associated interface binding on ChannelMojo

Changes the associated bindings implementation for ChannelMojo
such that remote interfaces can be acquired immediately upon
ChannelMojo construction rather than having to wait for connection
on the IO thread.

Simplifies the Channel bootstrapping process, removing a round-trip
Init message (and in fact the entire IPC::mojom::Boostrap interface)
since there's no need to actually exchange associated interface handles
over the pipe. Instead both sides can assume the other will use a fixed,
reserved endpoint ID for their IPC::mojom::Channel interface.

This also removes the restriction that associated interfaces must be
added to a Channel after Init. Instead the same constraints apply as
with AddFilter: an associated interface, like a filter, may be added
at any time as long as either Init hasn't been called OR the remote
process hasn't been launched.

The result of this CL is that any place it's safe to AddFilter,
it's also safe to AddAssociatedInterface; and any place it's safe to
Send, it's also safe to GetRemoteAssociatedInterface and begin using
any such remote interface immediately.

Remote interface requests as well as all messages to remote interfaces
retain FIFO with respect to any Send calls on the same thread. Local
interface request dispatch as well as all messages on locally bound
associated interfaces retain FIFO with respect to any OnMessageReceived
calls on the same thread.

BUG=612500,619202

Committed: https://crrev.com/e1037f997da9e1d44ca3b09d4ff32f0465673091
Committed: https://crrev.com/508da24622f957a01b076ccd058bfdccc79068a4
Review-Url: https://codereview.chromium.org/2163633003
Cr-Original-Original-Commit-Position: refs/heads/master@{#406720}
Cr-Original-Commit-Position: refs/heads/master@{#407050}
Cr-Commit-Position: refs/heads/master@{#407264}


CrOS-Libchrome-Original-Commit: 0e4de5f9a519c6cd206448a10eccc7a535e3db64
diff --git a/ipc/ipc.mojom b/ipc/ipc.mojom
index 28d73fb..2896fc2 100644
--- a/ipc/ipc.mojom
+++ b/ipc/ipc.mojom
@@ -22,17 +22,13 @@
 interface GenericInterface {};
 
 interface Channel {
+  // Informs the remote end of this client's PID. Must be called exactly once,
+  // before any calls to Receive() below.
+  SetPeerPid(int32 pid);
+
+  // Transmits a classical Chrome IPC message.
   Receive(array<uint8> data, array<SerializedHandle>? handles);
 
+  // Requests a Channel-associated interface.
   GetAssociatedInterface(string name, associated GenericInterface& request);
 };
-
-// An interface for connecting a pair of Channel interfaces representing a
-// bidirectional IPC channel.
-interface Bootstrap {
-  // Initializes a Chrome IPC channel over |to_client_channel| and
-  // |to_server_channel|. Each side also sends its PID to the other side.
-  Init(associated Channel& to_client_channel,
-       associated Channel to_server_channel,
-       int32 pid) => (int32 pid);
-};
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index 8d886b5..a1917d9 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -103,10 +103,11 @@
     virtual ~AssociatedInterfaceSupport() {}
 
     // Accesses the AssociatedGroup used to associate new interface endpoints
-    // with this Channel.
+    // with this Channel. Must be safe to call from any thread.
     virtual mojo::AssociatedGroup* GetAssociatedGroup() = 0;
 
-    // Adds an interface factory to this channel for interface |name|.
+    // Adds an interface factory to this channel for interface |name|. Must be
+    // safe to call from any thread.
     virtual void AddGenericAssociatedInterface(
         const std::string& name,
         const GenericAssociatedInterfaceFactory& factory) = 0;
@@ -116,11 +117,6 @@
         const std::string& name,
         mojo::ScopedInterfaceEndpointHandle handle) = 0;
 
-    // Sets the TaskRunner on which to support proxied dispatch for associated
-    // interfaces.
-    virtual void SetProxyTaskRunner(
-        scoped_refptr<base::SingleThreadTaskRunner> task_runner) = 0;
-
     // Template helper to add an interface factory to this channel.
     template <typename Interface>
     using AssociatedInterfaceFactory =
diff --git a/ipc/ipc_channel_mojo.cc b/ipc/ipc_channel_mojo.cc
index 1e4b499..9ce2e66 100644
--- a/ipc/ipc_channel_mojo.cc
+++ b/ipc/ipc_channel_mojo.cc
@@ -16,6 +16,7 @@
 #include "base/lazy_instance.h"
 #include "base/macros.h"
 #include "base/memory/ptr_util.h"
+#include "base/process/process_handle.h"
 #include "base/threading/thread_task_runner_handle.h"
 #include "build/build_config.h"
 #include "ipc/ipc_listener.h"
@@ -269,13 +270,11 @@
     Mode mode,
     Listener* listener,
     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
-    : pipe_(handle.get()),
-      listener_(listener),
-      waiting_connect_(true),
-      weak_factory_(this) {
+    : pipe_(handle.get()), listener_(listener), weak_factory_(this) {
   // Create MojoBootstrap after all members are set as it touches
   // ChannelMojo from a different thread.
-  bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, this);
+  bootstrap_ =
+      MojoBootstrap::Create(std::move(handle), mode, this, ipc_task_runner);
 }
 
 ChannelMojo::~ChannelMojo() {
@@ -284,89 +283,32 @@
 
 bool ChannelMojo::Connect() {
   WillConnect();
-  {
-    base::AutoLock lock(lock_);
-    DCHECK(!task_runner_);
-    task_runner_ = base::ThreadTaskRunnerHandle::Get();
-    DCHECK(!message_reader_);
-  }
+
+  DCHECK(!task_runner_);
+  task_runner_ = base::ThreadTaskRunnerHandle::Get();
+  DCHECK(!message_reader_);
+
   bootstrap_->Connect();
   return true;
 }
 
 void ChannelMojo::Close() {
-  std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> reader;
-  {
-    base::AutoLock lock(lock_);
-    if (!message_reader_)
-      return;
-    // The reader's destructor may re-enter Close, so we swap it out first to
-    // avoid deadlock when freeing it below.
-    std::swap(message_reader_, reader);
-
-    // We might Close() before we Connect().
-    waiting_connect_ = false;
-  }
-
+  // NOTE: The MessagePipeReader's destructor may re-enter this function. Use
+  // caution when changing this method.
+  std::unique_ptr<internal::MessagePipeReader> reader =
+      std::move(message_reader_);
   reader.reset();
+
+  base::AutoLock lock(associated_interface_lock_);
+  associated_interfaces_.clear();
 }
 
 // MojoBootstrap::Delegate implementation
-void ChannelMojo::OnPipesAvailable(
-    mojom::ChannelAssociatedPtrInfo send_channel,
-    mojom::ChannelAssociatedRequest receive_channel,
-    int32_t peer_pid) {
-  InitMessageReader(std::move(send_channel), std::move(receive_channel),
-                    peer_pid);
-}
-
-void ChannelMojo::OnBootstrapError() {
-  listener_->OnChannelError();
-}
-
-void ChannelMojo::OnAssociatedInterfaceRequest(
-    const std::string& name,
-    mojo::ScopedInterfaceEndpointHandle handle) {
-  auto iter = associated_interfaces_.find(name);
-  if (iter != associated_interfaces_.end())
-    iter->second.Run(std::move(handle));
-}
-
-void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
-                                    mojom::ChannelAssociatedRequest receiver,
-                                    base::ProcessId peer_pid) {
-  mojom::ChannelAssociatedPtr sender_ptr;
-  sender_ptr.Bind(std::move(sender));
-  std::unique_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter>
-      reader(new internal::MessagePipeReader(
-          pipe_, std::move(sender_ptr), std::move(receiver), peer_pid, this));
-
-  bool connected = true;
-  {
-    base::AutoLock lock(lock_);
-    for (size_t i = 0; i < pending_messages_.size(); ++i) {
-      if (!reader->Send(std::move(pending_messages_[i]))) {
-        LOG(ERROR) << "Failed to flush pending messages";
-        pending_messages_.clear();
-        connected = false;
-        break;
-      }
-    }
-
-    if (connected) {
-      // We set |message_reader_| here and won't get any |pending_messages_|
-      // hereafter. Although we might have some if there is an error, we don't
-      // care. They cannot be sent anyway.
-      message_reader_ = std::move(reader);
-      pending_messages_.clear();
-      waiting_connect_ = false;
-    }
-  }
-
-  if (connected)
-    listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
-  else
-    OnPipeError();
+void ChannelMojo::OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
+                                   mojom::ChannelAssociatedRequest receiver) {
+  sender->SetPeerPid(GetSelfPID());
+  message_reader_.reset(new internal::MessagePipeReader(
+      pipe_, std::move(sender), std::move(receiver), this));
 }
 
 void ChannelMojo::OnPipeError() {
@@ -380,15 +322,26 @@
   }
 }
 
-bool ChannelMojo::Send(Message* message) {
-  base::AutoLock lock(lock_);
-  if (!message_reader_) {
-    pending_messages_.push_back(base::WrapUnique(message));
-    // Counts as OK before the connection is established, but it's an
-    // error otherwise.
-    return waiting_connect_;
+void ChannelMojo::OnAssociatedInterfaceRequest(
+    const std::string& name,
+    mojo::ScopedInterfaceEndpointHandle handle) {
+  GenericAssociatedInterfaceFactory factory;
+  {
+    base::AutoLock locker(associated_interface_lock_);
+    auto iter = associated_interfaces_.find(name);
+    if (iter != associated_interfaces_.end())
+      factory = iter->second;
   }
 
+  if (!factory.is_null())
+    factory.Run(std::move(handle));
+}
+
+bool ChannelMojo::Send(Message* message) {
+  std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
+  if (!message_reader_)
+    return false;
+
   // Comment copied from ipc_channel_posix.cc:
   // We can't close the pipe here, because calling OnChannelError may destroy
   // this object, and that would be bad if we are called from Send(). Instead,
@@ -398,7 +351,7 @@
   //
   // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
   // pipe's connection error handler will be invoked in its place.
-  return message_reader_->Send(base::WrapUnique(message));
+  return message_reader_->Send(std::move(scoped_message));
 }
 
 bool ChannelMojo::IsSendThreadSafe() const {
@@ -406,20 +359,30 @@
 }
 
 base::ProcessId ChannelMojo::GetPeerPID() const {
-  base::AutoLock lock(lock_);
   if (!message_reader_)
     return base::kNullProcessId;
-
   return message_reader_->GetPeerPid();
 }
 
 base::ProcessId ChannelMojo::GetSelfPID() const {
-  return bootstrap_->GetSelfPID();
+#if defined(OS_LINUX)
+  if (int global_pid = GetGlobalPid())
+    return global_pid;
+#endif  // OS_LINUX
+#if defined(OS_NACL)
+  return -1;
+#else
+  return base::GetCurrentProcId();
+#endif  // defined(OS_NACL)
 }
 
 Channel::AssociatedInterfaceSupport*
 ChannelMojo::GetAssociatedInterfaceSupport() { return this; }
 
+void ChannelMojo::OnPeerPidReceived() {
+  listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
+}
+
 void ChannelMojo::OnMessageReceived(const Message& message) {
   TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
                "class", IPC_MESSAGE_ID_CLASS(message.type()),
@@ -504,6 +467,7 @@
 void ChannelMojo::AddGenericAssociatedInterface(
     const std::string& name,
     const GenericAssociatedInterfaceFactory& factory) {
+  base::AutoLock locker(associated_interface_lock_);
   auto result = associated_interfaces_.insert({ name, factory });
   DCHECK(result.second);
 }
@@ -511,14 +475,8 @@
 void ChannelMojo::GetGenericRemoteAssociatedInterface(
     const std::string& name,
     mojo::ScopedInterfaceEndpointHandle handle) {
-  DCHECK(message_reader_);
-  message_reader_->GetRemoteInterface(name, std::move(handle));
-}
-
-void ChannelMojo::SetProxyTaskRunner(
-    scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
-  DCHECK(bootstrap_);
-  bootstrap_->SetProxyTaskRunner(task_runner);
+  if (message_reader_)
+    message_reader_->GetRemoteInterface(name, std::move(handle));
 }
 
 }  // namespace IPC
diff --git a/ipc/ipc_channel_mojo.h b/ipc/ipc_channel_mojo.h
index 940dcf0..cfadb50 100644
--- a/ipc/ipc_channel_mojo.h
+++ b/ipc/ipc_channel_mojo.h
@@ -42,7 +42,7 @@
 class IPC_EXPORT ChannelMojo
     : public Channel,
       public Channel::AssociatedInterfaceSupport,
-      public MojoBootstrap::Delegate,
+      public NON_EXPORTED_BASE(MojoBootstrap::Delegate),
       public NON_EXPORTED_BASE(internal::MessagePipeReader::Delegate) {
  public:
   // Creates a ChannelMojo.
@@ -90,18 +90,17 @@
       mojo::Array<mojom::SerializedHandlePtr>* handles);
 
   // MojoBootstrapDelegate implementation
-  void OnPipesAvailable(mojom::ChannelAssociatedPtrInfo send_channel,
-                        mojom::ChannelAssociatedRequest receive_channel,
-                        int32_t peer_pid) override;
-  void OnBootstrapError() override;
+  void OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
+                        mojom::ChannelAssociatedRequest receiver) override;
+
+  // MessagePipeReader::Delegate
+  void OnPeerPidReceived() override;
+  void OnMessageReceived(const Message& message) override;
+  void OnPipeError() override;
   void OnAssociatedInterfaceRequest(
       const std::string& name,
       mojo::ScopedInterfaceEndpointHandle handle) override;
 
-  // MessagePipeReader::Delegate
-  void OnMessageReceived(const Message& message) override;
-  void OnPipeError() override;
-
  private:
   ChannelMojo(
       mojo::ScopedMessagePipeHandle handle,
@@ -109,10 +108,6 @@
       Listener* listener,
       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner);
 
-  void InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
-                         mojom::ChannelAssociatedRequest receiver,
-                         base::ProcessId peer_pid);
-
   // Channel::AssociatedInterfaceSupport:
   mojo::AssociatedGroup* GetAssociatedGroup() override;
   void AddGenericAssociatedInterface(
@@ -121,13 +116,6 @@
   void GetGenericRemoteAssociatedInterface(
       const std::string& name,
       mojo::ScopedInterfaceEndpointHandle handle) override;
-  void SetProxyTaskRunner(
-      scoped_refptr<base::SingleThreadTaskRunner> task_runner) override;
-
-  // ChannelMojo needs to kill its MessagePipeReader in delayed manner
-  // because the channel wants to kill these readers during the
-  // notifications invoked by them.
-  typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
 
   // A TaskRunner which runs tasks on the ChannelMojo's owning thread.
   scoped_refptr<base::TaskRunner> task_runner_;
@@ -136,15 +124,12 @@
   std::unique_ptr<MojoBootstrap> bootstrap_;
   Listener* listener_;
 
+  std::unique_ptr<internal::MessagePipeReader> message_reader_;
+
+  base::Lock associated_interface_lock_;
   std::map<std::string, GenericAssociatedInterfaceFactory>
       associated_interfaces_;
 
-  // Guards access to the fields below.
-  mutable base::Lock lock_;
-  std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_;
-  std::vector<std::unique_ptr<Message>> pending_messages_;
-  bool waiting_connect_;
-
   base::WeakPtrFactory<ChannelMojo> weak_factory_;
 
   DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
diff --git a/ipc/ipc_channel_mojo_unittest.cc b/ipc/ipc_channel_mojo_unittest.cc
index 0416720..f7652e7 100644
--- a/ipc/ipc_channel_mojo_unittest.cc
+++ b/ipc/ipc_channel_mojo_unittest.cc
@@ -758,6 +758,10 @@
   }
   void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
   void RunProxy() { runner_->RunProxy(); }
+  void DestroyProxy() {
+    runner_.reset();
+    base::RunLoop().RunUntilIdle();
+  }
 
   IPC::ChannelProxy* proxy() { return runner_->proxy(); }
 
@@ -808,6 +812,7 @@
   void RequestQuit(const RequestQuitCallback& callback) override {
     received_quit_ = true;
     callback.Run();
+    binding_.Close();
     base::MessageLoop::current()->QuitWhenIdle();
   }
 
@@ -838,7 +843,7 @@
   EXPECT_TRUE(WaitForClientShutdown());
   EXPECT_TRUE(listener.received_all_messages());
 
-  base::RunLoop().RunUntilIdle();
+  DestroyProxy();
 }
 
 class ChannelProxyClient {
@@ -848,6 +853,10 @@
   }
   void CreateProxy(IPC::Listener* listener) { runner_->CreateProxy(listener); }
   void RunProxy() { runner_->RunProxy(); }
+  void DestroyProxy() {
+    runner_.reset();
+    base::RunLoop().RunUntilIdle();
+  }
 
   IPC::ChannelProxy* proxy() { return runner_->proxy(); }
 
@@ -856,26 +865,17 @@
   std::unique_ptr<ChannelProxyRunner> runner_;
 };
 
-class ListenerThatWaitsForConnect : public IPC::Listener {
+class DummyListener : public IPC::Listener {
  public:
-  explicit ListenerThatWaitsForConnect(const base::Closure& connect_handler)
-      : connect_handler_(connect_handler) {}
-
   // IPC::Listener
   bool OnMessageReceived(const IPC::Message& message) override { return true; }
-  void OnChannelConnected(int32_t) override { connect_handler_.Run(); }
-
- private:
-  base::Closure connect_handler_;
 };
 
 DEFINE_IPC_CHANNEL_MOJO_TEST_CLIENT(ProxyThreadAssociatedInterfaceClient,
                                     ChannelProxyClient) {
-  base::RunLoop connect_loop;
-  ListenerThatWaitsForConnect listener(connect_loop.QuitClosure());
+  DummyListener listener;
   CreateProxy(&listener);
   RunProxy();
-  connect_loop.Run();
 
   // Send a bunch of interleaved messages, alternating between the associated
   // interface and a legacy IPC::Message.
@@ -889,6 +889,8 @@
   }
   driver->RequestQuit(base::MessageLoop::QuitWhenIdleClosure());
   base::RunLoop().Run();
+
+  DestroyProxy();
 }
 
 #if defined(OS_POSIX)
diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc
index 25229ac..2f4b7cf 100644
--- a/ipc/ipc_channel_proxy.cc
+++ b/ipc/ipc_channel_proxy.cc
@@ -79,6 +79,17 @@
   channel_ = factory->BuildChannel(this);
   channel_send_thread_safe_ = channel_->IsSendThreadSafe();
   channel_->SetAttachmentBrokerEndpoint(attachment_broker_endpoint_);
+
+  Channel::AssociatedInterfaceSupport* support =
+      channel_->GetAssociatedInterfaceSupport();
+  if (support) {
+    associated_group_ = *support->GetAssociatedGroup();
+
+    base::AutoLock l(pending_filters_lock_);
+    for (auto& entry : pending_interfaces_)
+      support->AddGenericAssociatedInterface(entry.first, entry.second);
+    pending_interfaces_.clear();
+  }
 }
 
 bool ChannelProxy::Context::TryFilters(const Message& message) {
@@ -162,23 +173,6 @@
 
   for (size_t i = 0; i < filters_.size(); ++i)
     filters_[i]->OnFilterAdded(channel_.get());
-
-  Channel::AssociatedInterfaceSupport* support =
-      channel_->GetAssociatedInterfaceSupport();
-  if (support) {
-    support->SetProxyTaskRunner(listener_task_runner_);
-    for (auto& entry : io_thread_interfaces_)
-      support->AddGenericAssociatedInterface(entry.first, entry.second);
-    for (auto& entry : proxy_thread_interfaces_) {
-      support->AddGenericAssociatedInterface(
-          entry.first, base::Bind(&BindAssociatedInterfaceOnTaskRunner,
-                                  listener_task_runner_, entry.second));
-    }
-  } else {
-    // Sanity check to ensure nobody's expecting to use associated interfaces on
-    // a Channel that doesn't support them.
-    DCHECK(io_thread_interfaces_.empty() && proxy_thread_interfaces_.empty());
-  }
 }
 
 // Called on the IPC::Channel thread
@@ -332,18 +326,6 @@
   if (channel_connected_called_)
     return;
 
-  {
-    base::AutoLock l(channel_lifetime_lock_);
-    if (channel_) {
-      Channel::AssociatedInterfaceSupport* associated_interface_support =
-          channel_->GetAssociatedInterfaceSupport();
-      if (associated_interface_support) {
-        channel_associated_group_.reset(new mojo::AssociatedGroup(
-            *associated_interface_support->GetAssociatedGroup()));
-      }
-    }
-  }
-
   base::ProcessId peer_pid;
   {
     base::AutoLock l(peer_pid_lock_);
@@ -369,6 +351,30 @@
 void ChannelProxy::Context::ClearChannel() {
   base::AutoLock l(channel_lifetime_lock_);
   channel_.reset();
+  associated_group_ = mojo::AssociatedGroup();
+}
+
+void ChannelProxy::Context::AddGenericAssociatedInterface(
+    const std::string& name,
+    const GenericAssociatedInterfaceFactory& factory) {
+  AddGenericAssociatedInterfaceForIOThread(
+      name, base::Bind(&BindAssociatedInterfaceOnTaskRunner,
+                       listener_task_runner_, factory));
+}
+
+void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread(
+    const std::string& name,
+    const GenericAssociatedInterfaceFactory& factory) {
+  base::AutoLock l(channel_lifetime_lock_);
+  if (!channel_) {
+    base::AutoLock l(pending_filters_lock_);
+    pending_interfaces_.emplace_back(name, factory);
+    return;
+  }
+  Channel::AssociatedInterfaceSupport* support =
+      channel_->GetAssociatedInterfaceSupport();
+  DCHECK(support);
+  support->AddGenericAssociatedInterface(name, factory);
 }
 
 void ChannelProxy::Context::SendFromThisThread(Message* message) {
@@ -546,24 +552,20 @@
                             base::RetainedRef(filter)));
 }
 
-void ChannelProxy::AddGenericAssociatedInterfaceForIOThread(
-    const std::string& name,
-    const GenericAssociatedInterfaceFactory& factory) {
-  DCHECK(CalledOnValidThread());
-  DCHECK(!did_init_);
-  context_->io_thread_interfaces_.insert({ name, factory });
-}
-
 void ChannelProxy::AddGenericAssociatedInterface(
     const std::string& name,
     const GenericAssociatedInterfaceFactory& factory) {
-  DCHECK(CalledOnValidThread());
-  DCHECK(!did_init_);
-  context_->proxy_thread_interfaces_.insert({ name, factory });
+  context()->AddGenericAssociatedInterface(name, factory);
+}
+
+void ChannelProxy::AddGenericAssociatedInterfaceForIOThread(
+    const std::string& name,
+    const GenericAssociatedInterfaceFactory& factory) {
+  context()->AddGenericAssociatedInterfaceForIOThread(name, factory);
 }
 
 mojo::AssociatedGroup* ChannelProxy::GetAssociatedGroup() {
-  return context_->channel_associated_group_.get();
+  return context()->associated_group();
 }
 
 void ChannelProxy::GetGenericRemoteAssociatedInterface(
diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h
index 03098c4..ebbe3e3 100644
--- a/ipc/ipc_channel_proxy.h
+++ b/ipc/ipc_channel_proxy.h
@@ -189,8 +189,7 @@
   }
 
   // Gets the AssociatedGroup used to create new associated endpoints on this
-  // ChannelProxy. This must only be called after the listener's
-  // OnChannelConnected is called.
+  // ChannelProxy.
   mojo::AssociatedGroup* GetAssociatedGroup();
 
   // Requests an associated interface from the remote endpoint.
@@ -199,7 +198,6 @@
       mojo::ScopedInterfaceEndpointHandle handle);
 
   // Template helper to request associated interfaces from the remote endpoint.
-  // Must only be called after the listener's OnChannelConnected is called.
   template <typename Interface>
   void GetRemoteAssociatedInterface(
       mojo::AssociatedInterfacePtr<Interface>* proxy) {
@@ -231,7 +229,7 @@
   class Context;
   // A subclass uses this constructor if it needs to add more information
   // to the internal state.
-  ChannelProxy(Context* context);
+  explicit ChannelProxy(Context* context);
 
 
   // Used internally to hold state that is referenced on the IPC thread.
@@ -312,6 +310,15 @@
     void SendFromThisThread(Message* message);
     void ClearChannel();
 
+    mojo::AssociatedGroup* associated_group() { return &associated_group_; }
+
+    void AddGenericAssociatedInterface(
+        const std::string& name,
+        const GenericAssociatedInterfaceFactory& factory);
+    void AddGenericAssociatedInterfaceForIOThread(
+        const std::string& name,
+        const GenericAssociatedInterfaceFactory& factory);
+
     scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
     Listener* listener_;
 
@@ -353,15 +360,14 @@
     // brokerable attachment messages to/from the broker process.
     bool attachment_broker_endpoint_;
 
-    // Modified only on the listener's thread before Init() is called.
-    std::map<std::string, GenericAssociatedInterfaceFactory>
-        io_thread_interfaces_;
-    std::map<std::string, GenericAssociatedInterfaceFactory>
-        proxy_thread_interfaces_;
+    mojo::AssociatedGroup associated_group_;
 
-    // Valid and constant any time after the ChannelProxy's Listener receives
-    // OnChannelConnected on its own thread.
-    std::unique_ptr<mojo::AssociatedGroup> channel_associated_group_;
+    // Holds associated interface binders added by AddGenericAssociatedInterface
+    // or AddGenericAssociatedInterfaceForIOThread until the underlying channel
+    // has been initialized.
+    base::Lock pending_interfaces_lock_;
+    std::vector<std::pair<std::string, GenericAssociatedInterfaceFactory>>
+        pending_interfaces_;
   };
 
   Context* context() { return context_.get(); }
diff --git a/ipc/ipc_message_pipe_reader.cc b/ipc/ipc_message_pipe_reader.cc
index 74c7285..8e1b355 100644
--- a/ipc/ipc_message_pipe_reader.cc
+++ b/ipc/ipc_message_pipe_reader.cc
@@ -55,10 +55,8 @@
     mojo::MessagePipeHandle pipe,
     mojom::ChannelAssociatedPtr sender,
     mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
-    base::ProcessId peer_pid,
     MessagePipeReader::Delegate* delegate)
     : delegate_(delegate),
-      peer_pid_(peer_pid),
       sender_(std::move(sender)),
       binding_(this, std::move(receiver)),
       sender_interface_id_(sender_.interface_id()),
@@ -123,9 +121,15 @@
   sender_->GetAssociatedInterface(name, std::move(request));
 }
 
+void MessagePipeReader::SetPeerPid(int32_t peer_pid) {
+  peer_pid_ = peer_pid;
+  delegate_->OnPeerPidReceived();
+}
+
 void MessagePipeReader::Receive(
     mojo::Array<uint8_t> data,
     mojo::Array<mojom::SerializedHandlePtr> handles) {
+  DCHECK_NE(peer_pid_, base::kNullProcessId);
   Message message(
       data.size() == 0 ? "" : reinterpret_cast<const char*>(&data[0]),
       static_cast<uint32_t>(data.size()));
@@ -156,16 +160,12 @@
 
 void MessagePipeReader::OnPipeError(MojoResult error) {
   DCHECK(thread_checker_.CalledOnValidThread());
+
+  Close();
+
+  // NOTE: The delegate call below may delete |this|.
   if (delegate_)
     delegate_->OnPipeError();
-  Close();
-}
-
-void MessagePipeReader::DelayedDeleter::operator()(
-    MessagePipeReader* ptr) const {
-  ptr->Close();
-  base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
-                                                base::Bind(&DeleteNow, ptr));
 }
 
 }  // namespace internal
diff --git a/ipc/ipc_message_pipe_reader.h b/ipc/ipc_message_pipe_reader.h
index 5aec440..e6a2a13 100644
--- a/ipc/ipc_message_pipe_reader.h
+++ b/ipc/ipc_message_pipe_reader.h
@@ -13,6 +13,7 @@
 #include "base/atomicops.h"
 #include "base/compiler_specific.h"
 #include "base/macros.h"
+#include "base/process/process_handle.h"
 #include "base/threading/thread_checker.h"
 #include "ipc/ipc.mojom.h"
 #include "ipc/ipc_export.h"
@@ -47,6 +48,7 @@
  public:
   class Delegate {
    public:
+    virtual void OnPeerPidReceived() = 0;
     virtual void OnMessageReceived(const Message& message) = 0;
     virtual void OnPipeError() = 0;
     virtual void OnAssociatedInterfaceRequest(
@@ -54,21 +56,6 @@
         mojo::ScopedInterfaceEndpointHandle handle) = 0;
   };
 
-  // Delay the object deletion using the current message loop.
-  // This is intended to used by MessagePipeReader owners.
-  class DelayedDeleter {
-   public:
-    typedef std::default_delete<MessagePipeReader> DefaultType;
-
-    static void DeleteNow(MessagePipeReader* ptr) { delete ptr; }
-
-    DelayedDeleter() {}
-    explicit DelayedDeleter(const DefaultType&) {}
-    DelayedDeleter& operator=(const DefaultType&) { return *this; }
-
-    void operator()(MessagePipeReader* ptr) const;
-  };
-
   // Builds a reader that reads messages from |receive_handle| and lets
   // |delegate| know.
   //
@@ -82,7 +69,6 @@
   MessagePipeReader(mojo::MessagePipeHandle pipe,
                     mojom::ChannelAssociatedPtr sender,
                     mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
-                    base::ProcessId peer_pid,
                     Delegate* delegate);
   ~MessagePipeReader() override;
 
@@ -108,6 +94,7 @@
 
  private:
   // mojom::Channel:
+  void SetPeerPid(int32_t peer_pid) override;
   void Receive(mojo::Array<uint8_t> data,
                mojo::Array<mojom::SerializedHandlePtr> handles) override;
   void GetAssociatedInterface(
@@ -116,7 +103,7 @@
 
   // |delegate_| is null once the message pipe is closed.
   Delegate* delegate_;
-  base::ProcessId peer_pid_;
+  base::ProcessId peer_pid_ = base::kNullProcessId;
   mojom::ChannelAssociatedPtr sender_;
   mojo::AssociatedBinding<mojom::Channel> binding_;
 
diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc
index fc39d0d..a21720b 100644
--- a/ipc/ipc_mojo_bootstrap.cc
+++ b/ipc/ipc_mojo_bootstrap.cc
@@ -9,12 +9,12 @@
 #include <map>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include "base/callback.h"
 #include "base/logging.h"
 #include "base/macros.h"
 #include "base/memory/ptr_util.h"
-#include "base/process/process_handle.h"
 #include "base/single_thread_task_runner.h"
 #include "base/stl_util.h"
 #include "base/synchronization/lock.h"
@@ -29,6 +29,7 @@
 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
 #include "mojo/public/cpp/bindings/interface_id.h"
+#include "mojo/public/cpp/bindings/message.h"
 #include "mojo/public/cpp/bindings/message_header_validator.h"
 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
@@ -43,39 +44,74 @@
       public mojo::MessageReceiver,
       public mojo::PipeControlMessageHandlerDelegate {
  public:
-  ChannelAssociatedGroupController(bool set_interface_id_namespace_bit,
-                                   mojo::ScopedMessagePipeHandle handle)
+  ChannelAssociatedGroupController(
+      bool set_interface_id_namespace_bit,
+      const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
       : mojo::AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()),
-        task_runner_(base::ThreadTaskRunnerHandle::Get()),
-        id_namespace_mask_(set_interface_id_namespace_bit ?
-              mojo::kInterfaceIdNamespaceMask : 0),
-        associated_group_(CreateAssociatedGroup()),
-        connector_(std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
-                   base::ThreadTaskRunnerHandle::Get()),
+        task_runner_(ipc_task_runner),
+        proxy_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+        set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
         header_validator_(
             "IPC::mojom::Bootstrap [master] MessageHeaderValidator", this),
         control_message_handler_(this),
-        control_message_proxy_(&connector_) {
-    connector_.set_incoming_receiver(&header_validator_);
-    connector_.set_connection_error_handler(
-        base::Bind(&ChannelAssociatedGroupController::OnPipeError,
-                   base::Unretained(this)));
+        control_message_proxy_thunk_(this),
+        control_message_proxy_(&control_message_proxy_thunk_) {
+    thread_checker_.DetachFromThread();
     control_message_handler_.SetDescription(
         "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
   }
 
-  mojo::AssociatedGroup* associated_group() { return associated_group_.get(); }
+  void Bind(mojo::ScopedMessagePipeHandle handle) {
+    DCHECK(thread_checker_.CalledOnValidThread());
+    DCHECK(task_runner_->BelongsToCurrentThread());
+    thread_task_runner_ = base::ThreadTaskRunnerHandle::Get();
+    connector_.reset(new mojo::Connector(
+        std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
+        task_runner_));
+    connector_->set_incoming_receiver(&header_validator_);
+    connector_->set_connection_error_handler(
+        base::Bind(&ChannelAssociatedGroupController::OnPipeError,
+                   base::Unretained(this)));
+
+    std::vector<std::unique_ptr<mojo::Message>> outgoing_messages;
+    std::swap(outgoing_messages, outgoing_messages_);
+    for (auto& message : outgoing_messages)
+      SendMessage(message.get());
+  }
+
+  void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
+                              mojom::ChannelAssociatedRequest* receiver) {
+    mojo::InterfaceId sender_id, receiver_id;
+    if (set_interface_id_namespace_bit_) {
+      sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
+      receiver_id = 1;
+    } else {
+      sender_id = 1;
+      receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
+    }
+
+    {
+      base::AutoLock locker(lock_);
+      Endpoint* sender_endpoint = new Endpoint(this, sender_id);
+      Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
+      endpoints_.insert({ sender_id, sender_endpoint });
+      endpoints_.insert({ receiver_id, receiver_endpoint });
+    }
+
+    mojo::ScopedInterfaceEndpointHandle sender_handle =
+        CreateScopedInterfaceEndpointHandle(sender_id, true);
+    mojo::ScopedInterfaceEndpointHandle receiver_handle =
+        CreateScopedInterfaceEndpointHandle(receiver_id, true);
+
+    sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
+    receiver->Bind(std::move(receiver_handle));
+  }
 
   void ShutDown() {
     DCHECK(thread_checker_.CalledOnValidThread());
-    connector_.CloseMessagePipe();
+    connector_->CloseMessagePipe();
     OnPipeError();
-    associated_group_.reset();
-  }
-
-  void SetProxyTaskRunner(
-      scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner) {
-    proxy_task_runner_ = proxy_task_runner;
+    connector_.reset();
   }
 
   // mojo::AssociatedGroupController:
@@ -86,8 +122,10 @@
     uint32_t id = 0;
     do {
       if (next_interface_id_ >= mojo::kInterfaceIdNamespaceMask)
-        next_interface_id_ = 1;
-      id = (next_interface_id_++) | id_namespace_mask_;
+        next_interface_id_ = 2;
+      id = next_interface_id_++;
+      if (set_interface_id_namespace_bit_)
+        id |= mojo::kInterfaceIdNamespaceMask;
     } while (ContainsKey(endpoints_, id));
 
     Endpoint* endpoint = new Endpoint(this, id);
@@ -121,7 +159,7 @@
     if (!is_local) {
       DCHECK(ContainsKey(endpoints_, id));
       DCHECK(!mojo::IsMasterInterfaceId(id));
-      NotifyEndpointClosedBeforeSent(id);
+      control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
       return;
     }
 
@@ -132,7 +170,7 @@
     MarkClosedAndMaybeRemove(endpoint);
 
     if (!mojo::IsMasterInterfaceId(id))
-      NotifyPeerEndpointClosed(id);
+      control_message_proxy_.NotifyPeerEndpointClosed(id);
   }
 
   mojo::InterfaceEndpointController* AttachEndpointClient(
@@ -170,8 +208,8 @@
   }
 
   void RaiseError() override {
-    if (task_runner_->BelongsToCurrentThread()) {
-      connector_.RaiseError();
+    if (IsRunningOnIPCThread()) {
+      connector_->RaiseError();
     } else {
       task_runner_->PostTask(
           FROM_HERE,
@@ -181,7 +219,9 @@
 
  private:
   class Endpoint;
+  class ControlMessageProxyThunk;
   friend class Endpoint;
+  friend class ControlMessageProxyThunk;
 
   class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
                    public mojo::InterfaceEndpointController {
@@ -260,7 +300,7 @@
 
       // It's not legal to make sync calls from the master endpoint's thread,
       // and in fact they must only happen from the proxy task runner.
-      DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
+      DCHECK(!controller_->IsRunningOnIPCThread());
       DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
 
       // TODO(rockot): Implement sync waiting.
@@ -284,9 +324,25 @@
     DISALLOW_COPY_AND_ASSIGN(Endpoint);
   };
 
+  class ControlMessageProxyThunk : public MessageReceiver {
+   public:
+    explicit ControlMessageProxyThunk(
+        ChannelAssociatedGroupController* controller)
+        : controller_(controller) {}
+
+   private:
+    // MessageReceiver:
+    bool Accept(mojo::Message* message) override {
+      return controller_->SendMessage(message);
+    }
+
+    ChannelAssociatedGroupController* controller_;
+
+    DISALLOW_COPY_AND_ASSIGN(ControlMessageProxyThunk);
+  };
+
   ~ChannelAssociatedGroupController() override {
     base::AutoLock locker(lock_);
-
     for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
       Endpoint* endpoint = iter->second.get();
       ++iter;
@@ -298,15 +354,30 @@
     DCHECK(endpoints_.empty());
   }
 
+  bool IsRunningOnIPCThread() {
+    // |task_runner_| is always non-null but may incorrectly report that
+    // BelongsToCurrentThread() == false during shutdown. By the time shutdown
+    // occurs, |thread_task_runner_| will be non-null and is guaranteed to run
+    // tasks on the same thread as |task_runner_|.
+    return task_runner_->BelongsToCurrentThread() ||
+        (thread_task_runner_ && thread_task_runner_->BelongsToCurrentThread());
+  }
+
   bool SendMessage(mojo::Message* message) {
-    if (task_runner_->BelongsToCurrentThread()) {
+    if (IsRunningOnIPCThread()) {
       DCHECK(thread_checker_.CalledOnValidThread());
-      return connector_.Accept(message);
+      if (!connector_) {
+        // Pipe may not be bound yet, so we queue the message.
+        std::unique_ptr<mojo::Message> queued_message(new mojo::Message);
+        message->MoveTo(queued_message.get());
+        outgoing_messages_.emplace_back(std::move(queued_message));
+        return true;
+      }
+      return connector_->Accept(message);
     } else {
       // We always post tasks to the master endpoint thread when called from the
       // proxy thread in order to simulate IPC::ChannelProxy::Send behavior.
-      DCHECK(proxy_task_runner_ &&
-             proxy_task_runner_->BelongsToCurrentThread());
+      DCHECK(proxy_task_runner_->BelongsToCurrentThread());
       std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
       message->MoveTo(passed_message.get());
       task_runner_->PostTask(
@@ -346,7 +417,7 @@
     }
 
     for (auto& endpoint : endpoints_to_notify) {
-      // Because an notification may in turn detach any endpoint, we have to
+      // Because a notification may in turn detach any endpoint, we have to
       // check each client again here.
       if (endpoint->client())
         NotifyEndpointOfError(endpoint.get(), false /* force_async */);
@@ -392,30 +463,6 @@
       endpoints_.erase(endpoint->id());
   }
 
-  void NotifyPeerEndpointClosed(mojo::InterfaceId id) {
-    if (task_runner_->BelongsToCurrentThread()) {
-      if (connector_.is_valid())
-        control_message_proxy_.NotifyPeerEndpointClosed(id);
-    } else {
-      task_runner_->PostTask(
-          FROM_HERE,
-          base::Bind(&ChannelAssociatedGroupController
-              ::NotifyPeerEndpointClosed, this, id));
-    }
-  }
-
-  void NotifyEndpointClosedBeforeSent(mojo::InterfaceId id) {
-    if (task_runner_->BelongsToCurrentThread()) {
-      if (connector_.is_valid())
-        control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
-    } else {
-      task_runner_->PostTask(
-          FROM_HERE,
-          base::Bind(&ChannelAssociatedGroupController
-              ::NotifyEndpointClosedBeforeSent, this, id));
-    }
-  }
-
   Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
     lock_.AssertAcquired();
     DCHECK(!inserted || !*inserted);
@@ -510,7 +557,7 @@
     if (inserted) {
       MarkClosedAndMaybeRemove(endpoint);
       if (!mojo::IsMasterInterfaceId(id))
-        NotifyPeerEndpointClosed(id);
+        control_message_proxy_.NotifyPeerEndpointClosed(id);
       return nullptr;
     }
 
@@ -527,6 +574,7 @@
     if (mojo::IsMasterInterfaceId(id))
       return false;
 
+    scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
     base::AutoLock locker(lock_);
     scoped_refptr<Endpoint> endpoint = FindOrInsertEndpoint(id, nullptr);
     if (!endpoint->peer_closed()) {
@@ -556,292 +604,92 @@
   base::ThreadChecker thread_checker_;
 
   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+
+  // A TaskRunner that runs tasks on the same thread as |task_runner_| but which
+  // is used exclusively to do thread safety checking. This is an unfortunate
+  // consequence of bad interaction between some TaskRunner implementations and
+  // MessageLoop destruction which may cause the user-provided |task_runner_| to
+  // incorrectly report that BelongsToCurrentThread() == false during shutdown.
+  scoped_refptr<base::SingleThreadTaskRunner> thread_task_runner_;
+
   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
-  const uint32_t id_namespace_mask_;
-  std::unique_ptr<mojo::AssociatedGroup> associated_group_;
-  mojo::Connector connector_;
+  const bool set_interface_id_namespace_bit_;
+  std::unique_ptr<mojo::Connector> connector_;
   mojo::MessageHeaderValidator header_validator_;
   mojo::PipeControlMessageHandler control_message_handler_;
+  ControlMessageProxyThunk control_message_proxy_thunk_;
+  mojo::PipeControlMessageProxy control_message_proxy_;
+
+  // Outgoing messages that were sent before this controller was bound to a
+  // real message pipe.
+  std::vector<std::unique_ptr<mojo::Message>> outgoing_messages_;
 
   // Guards the fields below for thread-safe access.
   base::Lock lock_;
 
   bool encountered_error_ = false;
-  uint32_t next_interface_id_ = 1;
+
+  // ID #1 is reserved for the mojom::Channel interface.
+  uint32_t next_interface_id_ = 2;
+
   std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_;
-  mojo::PipeControlMessageProxy control_message_proxy_;
 
   DISALLOW_COPY_AND_ASSIGN(ChannelAssociatedGroupController);
 };
 
-class BootstrapMasterProxy {
+class MojoBootstrapImpl : public MojoBootstrap {
  public:
-  BootstrapMasterProxy() {}
-  ~BootstrapMasterProxy() {
-    endpoint_client_.reset();
-    proxy_.reset();
-    if (controller_)
-      controller_->ShutDown();
+  MojoBootstrapImpl(
+      mojo::ScopedMessagePipeHandle handle,
+      Delegate* delegate,
+      const scoped_refptr<ChannelAssociatedGroupController> controller)
+            : controller_(controller),
+        handle_(std::move(handle)),
+        delegate_(delegate) {
+    associated_group_ = controller_->CreateAssociatedGroup();
   }
 
-  void Bind(mojo::ScopedMessagePipeHandle handle) {
-    DCHECK(!controller_);
-    controller_ = new ChannelAssociatedGroupController(true, std::move(handle));
-    endpoint_client_.reset(new mojo::InterfaceEndpointClient(
-        controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId),
-        nullptr,
-        base::MakeUnique<typename mojom::Bootstrap::ResponseValidator_>(),
-        false, base::ThreadTaskRunnerHandle::Get()));
-    proxy_.reset(new mojom::BootstrapProxy(endpoint_client_.get()));
-    proxy_->serialization_context()->group_controller = controller_;
-  }
-
-  void set_connection_error_handler(const base::Closure& handler) {
-    DCHECK(endpoint_client_);
-    endpoint_client_->set_connection_error_handler(handler);
-  }
-
-  mojo::AssociatedGroup* associated_group() {
-    DCHECK(controller_);
-    return controller_->associated_group();
-  }
-
-  ChannelAssociatedGroupController* controller() {
-    DCHECK(controller_);
-    return controller_.get();
-  }
-
-  mojom::Bootstrap* operator->() {
-    DCHECK(proxy_);
-    return proxy_.get();
+  ~MojoBootstrapImpl() override {
+    controller_->ShutDown();
   }
 
  private:
-  std::unique_ptr<mojom::BootstrapProxy> proxy_;
-  scoped_refptr<ChannelAssociatedGroupController> controller_;
-  std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_;
+  // MojoBootstrap:
+  void Connect() override {
+    controller_->Bind(std::move(handle_));
 
-  DISALLOW_COPY_AND_ASSIGN(BootstrapMasterProxy);
-};
+    IPC::mojom::ChannelAssociatedPtr sender;
+    IPC::mojom::ChannelAssociatedRequest receiver;
+    controller_->CreateChannelEndpoints(&sender, &receiver);
 
-class BootstrapMasterBinding {
- public:
-  explicit BootstrapMasterBinding(mojom::Bootstrap* impl) {
-    stub_.set_sink(impl);
+    delegate_->OnPipesAvailable(std::move(sender), std::move(receiver));
   }
 
-  ~BootstrapMasterBinding() {
-    endpoint_client_.reset();
-    if (controller_)
-      controller_->ShutDown();
-  }
-
-  void set_connection_error_handler(const base::Closure& handler) {
-    DCHECK(endpoint_client_);
-    endpoint_client_->set_connection_error_handler(handler);
-  }
-
-  mojo::AssociatedGroup* associated_group() {
-    DCHECK(controller_);
-    return controller_->associated_group();
-  }
-
-  ChannelAssociatedGroupController* controller() {
-    DCHECK(controller_);
-    return controller_.get();
-  }
-
-  void Bind(mojo::ScopedMessagePipeHandle handle) {
-    DCHECK(!controller_);
-    controller_ =
-        new ChannelAssociatedGroupController(false, std::move(handle));
-    stub_.serialization_context()->group_controller = controller_;
-
-    endpoint_client_.reset(new mojo::InterfaceEndpointClient(
-        controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId),
-        &stub_,
-        base::MakeUnique<typename mojom::Bootstrap::RequestValidator_>(),
-        false, base::ThreadTaskRunnerHandle::Get()));
-  }
-
- private:
-  mojom::BootstrapStub stub_;
-  scoped_refptr<ChannelAssociatedGroupController> controller_;
-  std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_;
-
-  DISALLOW_COPY_AND_ASSIGN(BootstrapMasterBinding);
-};
-
-// MojoBootstrap for the server process. You should create the instance
-// using MojoBootstrap::Create().
-class MojoServerBootstrap : public MojoBootstrap {
- public:
-  MojoServerBootstrap();
-
- private:
-  // MojoBootstrap implementation.
-  void Connect() override;
-
   mojo::AssociatedGroup* GetAssociatedGroup() override {
-    return bootstrap_.associated_group();
+    return associated_group_.get();
   }
 
-  void SetProxyTaskRunner(
-      scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
-    bootstrap_.controller()->SetProxyTaskRunner(task_runner);
-  }
+  scoped_refptr<ChannelAssociatedGroupController> controller_;
 
-  void OnInitDone(int32_t peer_pid);
+  mojo::ScopedMessagePipeHandle handle_;
+  Delegate* delegate_;
+  std::unique_ptr<mojo::AssociatedGroup> associated_group_;
 
-  BootstrapMasterProxy bootstrap_;
-  IPC::mojom::ChannelAssociatedPtrInfo send_channel_;
-  IPC::mojom::ChannelAssociatedRequest receive_channel_request_;
-
-  DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap);
+  DISALLOW_COPY_AND_ASSIGN(MojoBootstrapImpl);
 };
 
-MojoServerBootstrap::MojoServerBootstrap() = default;
-
-void MojoServerBootstrap::Connect() {
-  DCHECK_EQ(state(), STATE_INITIALIZED);
-
-  bootstrap_.Bind(TakeHandle());
-  bootstrap_.set_connection_error_handler(
-      base::Bind(&MojoServerBootstrap::Fail, base::Unretained(this)));
-
-  IPC::mojom::ChannelAssociatedRequest send_channel_request;
-  IPC::mojom::ChannelAssociatedPtrInfo receive_channel;
-
-  bootstrap_.associated_group()->CreateAssociatedInterface(
-      mojo::AssociatedGroup::WILL_PASS_REQUEST, &send_channel_,
-      &send_channel_request);
-  bootstrap_.associated_group()->CreateAssociatedInterface(
-      mojo::AssociatedGroup::WILL_PASS_PTR, &receive_channel,
-      &receive_channel_request_);
-
-  bootstrap_->Init(
-      std::move(send_channel_request), std::move(receive_channel),
-      GetSelfPID(),
-      base::Bind(&MojoServerBootstrap::OnInitDone, base::Unretained(this)));
-
-  set_state(STATE_WAITING_ACK);
-}
-
-void MojoServerBootstrap::OnInitDone(int32_t peer_pid) {
-  if (state() != STATE_WAITING_ACK) {
-    set_state(STATE_ERROR);
-    LOG(ERROR) << "Got inconsistent message from client.";
-    return;
-  }
-
-  set_state(STATE_READY);
-  bootstrap_.set_connection_error_handler(base::Closure());
-  delegate()->OnPipesAvailable(std::move(send_channel_),
-                               std::move(receive_channel_request_), peer_pid);
-}
-
-// MojoBootstrap for client processes. You should create the instance
-// using MojoBootstrap::Create().
-class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap {
- public:
-  MojoClientBootstrap();
-
- private:
-  // MojoBootstrap implementation.
-  void Connect() override;
-
-  mojo::AssociatedGroup* GetAssociatedGroup() override {
-    return binding_.associated_group();
-  }
-
-  void SetProxyTaskRunner(
-      scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
-    binding_.controller()->SetProxyTaskRunner(task_runner);
-  }
-
-  // mojom::Bootstrap implementation.
-  void Init(mojom::ChannelAssociatedRequest receive_channel,
-            mojom::ChannelAssociatedPtrInfo send_channel,
-            int32_t peer_pid,
-            const InitCallback& callback) override;
-
-  BootstrapMasterBinding binding_;
-
-  DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap);
-};
-
-MojoClientBootstrap::MojoClientBootstrap() : binding_(this) {}
-
-void MojoClientBootstrap::Connect() {
-  binding_.Bind(TakeHandle());
-  binding_.set_connection_error_handler(
-      base::Bind(&MojoClientBootstrap::Fail, base::Unretained(this)));
-}
-
-void MojoClientBootstrap::Init(mojom::ChannelAssociatedRequest receive_channel,
-                               mojom::ChannelAssociatedPtrInfo send_channel,
-                               int32_t peer_pid,
-                               const InitCallback& callback) {
-  callback.Run(GetSelfPID());
-  set_state(STATE_READY);
-  binding_.set_connection_error_handler(base::Closure());
-  delegate()->OnPipesAvailable(std::move(send_channel),
-                               std::move(receive_channel), peer_pid);
-}
-
 }  // namespace
 
-// MojoBootstrap
-
 // static
 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
     mojo::ScopedMessagePipeHandle handle,
     Channel::Mode mode,
-    Delegate* delegate) {
-  CHECK(mode == Channel::MODE_CLIENT || mode == Channel::MODE_SERVER);
-  std::unique_ptr<MojoBootstrap> self =
-      mode == Channel::MODE_CLIENT
-          ? std::unique_ptr<MojoBootstrap>(new MojoClientBootstrap)
-          : std::unique_ptr<MojoBootstrap>(new MojoServerBootstrap);
-
-  self->Init(std::move(handle), delegate);
-  return self;
-}
-
-MojoBootstrap::MojoBootstrap() : delegate_(NULL), state_(STATE_INITIALIZED) {
-}
-
-MojoBootstrap::~MojoBootstrap() {}
-
-void MojoBootstrap::Init(mojo::ScopedMessagePipeHandle handle,
-                         Delegate* delegate) {
-  handle_ = std::move(handle);
-  delegate_ = delegate;
-}
-
-base::ProcessId MojoBootstrap::GetSelfPID() const {
-#if defined(OS_LINUX)
-  if (int global_pid = Channel::GetGlobalPid())
-    return global_pid;
-#endif  // OS_LINUX
-#if defined(OS_NACL)
-  return -1;
-#else
-  return base::GetCurrentProcId();
-#endif  // defined(OS_NACL)
-}
-
-void MojoBootstrap::Fail() {
-  set_state(STATE_ERROR);
-  delegate()->OnBootstrapError();
-}
-
-bool MojoBootstrap::HasFailed() const {
-  return state() == STATE_ERROR;
-}
-
-mojo::ScopedMessagePipeHandle MojoBootstrap::TakeHandle() {
-  return std::move(handle_);
+    Delegate* delegate,
+    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
+  return base::MakeUnique<MojoBootstrapImpl>(
+      std::move(handle), delegate,
+      new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
+                                           ipc_task_runner));
 }
 
 }  // namespace IPC
diff --git a/ipc/ipc_mojo_bootstrap.h b/ipc/ipc_mojo_bootstrap.h
index b9df408..f18857a 100644
--- a/ipc/ipc_mojo_bootstrap.h
+++ b/ipc/ipc_mojo_bootstrap.h
@@ -11,7 +11,6 @@
 
 #include "base/macros.h"
 #include "base/memory/ref_counted.h"
-#include "base/process/process_handle.h"
 #include "base/single_thread_task_runner.h"
 #include "build/build_config.h"
 #include "ipc/ipc.mojom.h"
@@ -34,57 +33,26 @@
  public:
   class Delegate {
    public:
-    virtual void OnPipesAvailable(
-        mojom::ChannelAssociatedPtrInfo send_channel,
-        mojom::ChannelAssociatedRequest receive_channel,
-        int32_t peer_pid) = 0;
-    virtual void OnBootstrapError() = 0;
+    virtual ~Delegate() {}
+
+    virtual void OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
+                                  mojom::ChannelAssociatedRequest receiver) = 0;
   };
 
+  virtual ~MojoBootstrap() {}
+
   // Create the MojoBootstrap instance, using |handle| as the message pipe, in
   // mode as specified by |mode|. The result is passed to |delegate|.
   static std::unique_ptr<MojoBootstrap> Create(
       mojo::ScopedMessagePipeHandle handle,
       Channel::Mode mode,
-      Delegate* delegate);
-
-  MojoBootstrap();
-  virtual ~MojoBootstrap();
+      Delegate* delegate,
+      const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner);
 
   // Start the handshake over the underlying message pipe.
   virtual void Connect() = 0;
 
   virtual mojo::AssociatedGroup* GetAssociatedGroup() = 0;
-
-  virtual void SetProxyTaskRunner(
-      scoped_refptr<base::SingleThreadTaskRunner> task_runner) = 0;
-
-  // GetSelfPID returns our PID.
-  base::ProcessId GetSelfPID() const;
-
- protected:
-  // On MojoServerBootstrap: INITIALIZED -> WAITING_ACK -> READY
-  // On MojoClientBootstrap: INITIALIZED -> READY
-  // STATE_ERROR is a catch-all state that captures any observed error.
-  enum State { STATE_INITIALIZED, STATE_WAITING_ACK, STATE_READY, STATE_ERROR };
-
-  Delegate* delegate() const { return delegate_; }
-  void Fail();
-  bool HasFailed() const;
-
-  State state() const { return state_; }
-  void set_state(State state) { state_ = state; }
-
-  mojo::ScopedMessagePipeHandle TakeHandle();
-
- private:
-  void Init(mojo::ScopedMessagePipeHandle, Delegate* delegate);
-
-  mojo::ScopedMessagePipeHandle handle_;
-  Delegate* delegate_;
-  State state_;
-
-  DISALLOW_COPY_AND_ASSIGN(MojoBootstrap);
 };
 
 }  // namespace IPC
diff --git a/ipc/ipc_mojo_bootstrap_unittest.cc b/ipc/ipc_mojo_bootstrap_unittest.cc
index 876ce67..9d4aaf7 100644
--- a/ipc/ipc_mojo_bootstrap_unittest.cc
+++ b/ipc/ipc_mojo_bootstrap_unittest.cc
@@ -35,10 +35,9 @@
   explicit TestingDelegate(const base::Closure& quit_callback)
       : passed_(false), quit_callback_(quit_callback) {}
 
-  void OnPipesAvailable(IPC::mojom::ChannelAssociatedPtrInfo send_channel,
-                        IPC::mojom::ChannelAssociatedRequest receive_channel,
-                        int32_t peer_pid) override;
-  void OnBootstrapError() override;
+  void OnPipesAvailable(
+      IPC::mojom::ChannelAssociatedPtr sender,
+      IPC::mojom::ChannelAssociatedRequest receiver) override;
 
   bool passed() const { return passed_; }
 
@@ -48,24 +47,20 @@
 };
 
 void TestingDelegate::OnPipesAvailable(
-    IPC::mojom::ChannelAssociatedPtrInfo send_channel,
-    IPC::mojom::ChannelAssociatedRequest receive_channel,
-    int32_t peer_pid) {
+    IPC::mojom::ChannelAssociatedPtr sender,
+    IPC::mojom::ChannelAssociatedRequest receiver) {
   passed_ = true;
   quit_callback_.Run();
 }
 
-void TestingDelegate::OnBootstrapError() {
-  quit_callback_.Run();
-}
-
 TEST_F(IPCMojoBootstrapTest, Connect) {
   base::MessageLoop message_loop;
   base::RunLoop run_loop;
   TestingDelegate delegate(run_loop.QuitClosure());
   std::unique_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create(
       helper_.StartChild("IPCMojoBootstrapTestClient"),
-      IPC::Channel::MODE_SERVER, &delegate);
+      IPC::Channel::MODE_SERVER, &delegate,
+      base::ThreadTaskRunnerHandle::Get());
 
   bootstrap->Connect();
   run_loop.Run();
@@ -84,7 +79,8 @@
   std::unique_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create(
       mojo::edk::CreateChildMessagePipe(
           mojo::edk::test::MultiprocessTestHelper::primordial_pipe_token),
-      IPC::Channel::MODE_CLIENT, &delegate);
+      IPC::Channel::MODE_CLIENT, &delegate,
+      base::ThreadTaskRunnerHandle::Get());
 
   bootstrap->Connect();
 
diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
index e1f388a..9ececaa 100644
--- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
+++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
@@ -159,7 +159,8 @@
 InterfaceEndpointClient::~InterfaceEndpointClient() {
   DCHECK(thread_checker_.CalledOnValidThread());
 
-  handle_.group_controller()->DetachEndpointClient(handle_);
+  if (handle_.is_valid())
+    handle_.group_controller()->DetachEndpointClient(handle_);
 }
 
 AssociatedGroup* InterfaceEndpointClient::associated_group() {