Mojo EDK: Scoped peer process connections

Encloses the ad hoc ConnectToPeerProcess and ClosePeerConnection APIs
within a new PeerConnection scoper which implicitly controls the
lifetime of the connection.

In addition to being safer, this new API plumbs through ConnectionParams
for peer process connections.

This eliminates the remaining legacy EDK IPC API surface.

BUG=696031
TBR=jam@chromium.org

Change-Id: I1795e0328fd0e8242fb905ab0437dbc2985e6746
Reviewed-on: https://chromium-review.googlesource.com/505714
Commit-Queue: Ken Rockot <rockot@chromium.org>
Reviewed-by: Jay Civelli <jcivelli@chromium.org>
Cr-Commit-Position: refs/heads/master@{#472973}

CrOS-Libchrome-Original-Commit: 47f4e89ec7a4477a18c12595cca6d2aa46231f78
diff --git a/mojo/edk/embedder/embedder.cc b/mojo/edk/embedder/embedder.cc
index fa1994f..0554b4d 100644
--- a/mojo/edk/embedder/embedder.cc
+++ b/mojo/edk/embedder/embedder.cc
@@ -134,22 +134,5 @@
 }
 #endif
 
-// Legacy IPC Helpers ----------------------------------------------------------
-
-ScopedMessagePipeHandle ConnectToPeerProcess(ScopedPlatformHandle pipe) {
-  return ConnectToPeerProcess(std::move(pipe), GenerateRandomToken());
-}
-
-ScopedMessagePipeHandle ConnectToPeerProcess(ScopedPlatformHandle pipe,
-                                             const std::string& peer_token) {
-  DCHECK(pipe.is_valid());
-  DCHECK(!peer_token.empty());
-  return internal::g_core->ConnectToPeerProcess(std::move(pipe), peer_token);
-}
-
-void ClosePeerConnection(const std::string& peer_token) {
-  return internal::g_core->ClosePeerConnection(peer_token);
-}
-
 }  // namespace edk
 }  // namespace mojo
diff --git a/mojo/edk/embedder/embedder.h b/mojo/edk/embedder/embedder.h
index 8951f8f..460de87 100644
--- a/mojo/edk/embedder/embedder.h
+++ b/mojo/edk/embedder/embedder.h
@@ -138,27 +138,6 @@
     base::PortProvider* port_provider);
 #endif
 
-// Legacy IPC Helpers ----------------------------------------------------------
-
-// Called to connect to a peer process. This should be called only if there
-// is no common ancestor for the processes involved within this mojo system.
-// Both processes must call this function, each passing one end of a platform
-// channel. This returns one end of a message pipe to each process.
-MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
-ConnectToPeerProcess(ScopedPlatformHandle pipe);
-
-// Called to connect to a peer process. This should be called only if there
-// is no common ancestor for the processes involved within this mojo system.
-// Both processes must call this function, each passing one end of a platform
-// channel. This returns one end of a message pipe to each process. |peer_token|
-// may be passed to ClosePeerConnection() to close the connection.
-MOJO_SYSTEM_IMPL_EXPORT ScopedMessagePipeHandle
-ConnectToPeerProcess(ScopedPlatformHandle pipe, const std::string& peer_token);
-
-// Closes a connection to a peer process created by ConnectToPeerProcess()
-// where the same |peer_token| was used.
-MOJO_SYSTEM_IMPL_EXPORT void ClosePeerConnection(const std::string& peer_token);
-
 }  // namespace edk
 }  // namespace mojo
 
diff --git a/mojo/edk/embedder/embedder_unittest.cc b/mojo/edk/embedder/embedder_unittest.cc
index a59e041..7c41bc9 100644
--- a/mojo/edk/embedder/embedder_unittest.cc
+++ b/mojo/edk/embedder/embedder_unittest.cc
@@ -28,6 +28,7 @@
 #include "mojo/edk/embedder/named_platform_handle.h"
 #include "mojo/edk/embedder/named_platform_handle_utils.h"
 #include "mojo/edk/embedder/outgoing_broker_client_invitation.h"
+#include "mojo/edk/embedder/peer_connection.h"
 #include "mojo/edk/embedder/platform_channel_pair.h"
 #include "mojo/edk/embedder/test_embedder.h"
 #include "mojo/edk/system/test_utils.h"
@@ -510,9 +511,12 @@
 TEST_F(EmbedderTest, ClosePendingPeerConnection) {
   NamedPlatformHandle named_handle = GenerateChannelName();
   std::string peer_token = GenerateRandomToken();
+
+  auto peer_connection = base::MakeUnique<PeerConnection>();
   ScopedMessagePipeHandle server_pipe =
-      ConnectToPeerProcess(CreateServerHandle(named_handle), peer_token);
-  ClosePeerConnection(peer_token);
+      peer_connection->Connect(ConnectionParams(
+          TransportProtocol::kLegacy, CreateServerHandle(named_handle)));
+  peer_connection.reset();
   EXPECT_EQ(MOJO_RESULT_OK,
             Wait(server_pipe.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED));
   base::MessageLoop message_loop;
diff --git a/mojo/edk/embedder/peer_connection.cc b/mojo/edk/embedder/peer_connection.cc
new file mode 100644
index 0000000..77b2648
--- /dev/null
+++ b/mojo/edk/embedder/peer_connection.cc
@@ -0,0 +1,32 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/embedder/peer_connection.h"
+
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/system/core.h"
+
+namespace mojo {
+namespace edk {
+
+PeerConnection::PeerConnection() = default;
+
+PeerConnection::~PeerConnection() {
+  if (is_connected_)
+    internal::g_core->ClosePeerConnection(connection_id_);
+}
+
+ScopedMessagePipeHandle PeerConnection::Connect(ConnectionParams params) {
+  DCHECK(!is_connected_);
+  is_connected_ = true;
+
+  ports::PortRef peer_port;
+  auto pipe = internal::g_core->CreatePartialMessagePipe(&peer_port);
+  connection_id_ =
+      internal::g_core->ConnectToPeer(std::move(params), peer_port);
+  return pipe;
+}
+
+}  // namespace edk
+}  // namespace mojo
diff --git a/mojo/edk/embedder/peer_connection.h b/mojo/edk/embedder/peer_connection.h
new file mode 100644
index 0000000..f17fc48
--- /dev/null
+++ b/mojo/edk/embedder/peer_connection.h
@@ -0,0 +1,54 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_EDK_EMBEDDER_PEER_CONNECTION_H_
+#define MOJO_EDK_EMBEDDER_PEER_CONNECTION_H_
+
+#include "base/macros.h"
+#include "mojo/edk/embedder/connection_params.h"
+#include "mojo/edk/system/system_impl_export.h"
+#include "mojo/public/cpp/system/message_pipe.h"
+
+namespace mojo {
+namespace edk {
+
+// Used to connect to a peer process.
+//
+// NOTE: This should ONLY be used if there is no common ancestor for the
+// processes being connected. Peer connections have limited capabilities with
+// respect to Mojo IPC when compared to standard broker client connections (see
+// OutgoingBrokerClientInvitation and IncomingBrokerClientInvitation), and in
+// particular it's undefined behavior to attempt to forward any resources
+// (message pipes or other system handles) received from a peer process over to
+// any other process to which you're connected.
+//
+// Both processes must construct a PeerConnection with each one corresponding to
+// one end of some shared connection medium (e.g. a platform channel.)
+//
+// Each PeerConnection gets an implicit cross-process message pipe, the local
+// endpoint of which may be acquired by a one-time call to TakeMessagePipe().
+//
+// Once established, the connection to the remote peer will remain valid as long
+// as each process keeps its respective PeerConnection object alive.
+class MOJO_SYSTEM_IMPL_EXPORT PeerConnection {
+ public:
+  // Constructs a disconnected connection.
+  PeerConnection();
+  ~PeerConnection();
+
+  // Connects to the peer and returns a primordial message pipe handle which
+  // will be connected to a corresponding peer pipe in the remote process.
+  ScopedMessagePipeHandle Connect(ConnectionParams params);
+
+ private:
+  bool is_connected_ = false;
+  uint64_t connection_id_ = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(PeerConnection);
+};
+
+}  // namespace edk
+}  // namespace mojo
+
+#endif  // MOJO_EDK_EMBEDDER_PEER_CONNECTION_H_
diff --git a/mojo/edk/system/core.cc b/mojo/edk/system/core.cc
index 5ee1020..c0b8be2 100644
--- a/mojo/edk/system/core.cc
+++ b/mojo/edk/system/core.cc
@@ -192,23 +192,13 @@
       std::move(connection_params));
 }
 
-ScopedMessagePipeHandle Core::ConnectToPeerProcess(
-    ScopedPlatformHandle pipe_handle,
-    const std::string& peer_token) {
-  RequestContext request_context;
-  ports::PortRef port0, port1;
-  GetNodeController()->node()->CreatePortPair(&port0, &port1);
-  MojoHandle handle = AddDispatcher(new MessagePipeDispatcher(
-      GetNodeController(), port0, kUnknownPipeIdForDebug, 0));
-  ConnectionParams connection_params(TransportProtocol::kLegacy,
-                                     std::move(pipe_handle));
-  GetNodeController()->ConnectToPeer(std::move(connection_params), port1,
-                                     peer_token);
-  return ScopedMessagePipeHandle(MessagePipeHandle(handle));
+uint64_t Core::ConnectToPeer(ConnectionParams connection_params,
+                             const ports::PortRef& port) {
+  return GetNodeController()->ConnectToPeer(std::move(connection_params), port);
 }
 
-void Core::ClosePeerConnection(const std::string& peer_token) {
-  GetNodeController()->ClosePeerConnection(peer_token);
+void Core::ClosePeerConnection(uint64_t peer_connection_id) {
+  GetNodeController()->ClosePeerConnection(peer_connection_id);
 }
 
 void Core::SetMachPortProvider(base::PortProvider* port_provider) {
diff --git a/mojo/edk/system/core.h b/mojo/edk/system/core.h
index 8992721..c2d0aa7 100644
--- a/mojo/edk/system/core.h
+++ b/mojo/edk/system/core.h
@@ -96,10 +96,14 @@
   // Called to connect to a peer process. This should be called only if there
   // is no common ancestor for the processes involved within this mojo system.
   // Both processes must call this function, each passing one end of a platform
-  // channel. This returns one end of a message pipe to each process.
-  ScopedMessagePipeHandle ConnectToPeerProcess(ScopedPlatformHandle pipe_handle,
-                                               const std::string& peer_token);
-  void ClosePeerConnection(const std::string& peer_token);
+  // channel. |port| is a port to be merged with the remote peer's port, which
+  // it will provide via the same API.
+  //
+  // Returns an ID which can be later used to close the connection via
+  // ClosePeerConnection().
+  uint64_t ConnectToPeer(ConnectionParams connection_params,
+                         const ports::PortRef& port);
+  void ClosePeerConnection(uint64_t peer_connection_id);
 
   // Sets the mach port provider for this process.
   void SetMachPortProvider(base::PortProvider* port_provider);
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 0901022..7087f0b 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -203,12 +203,6 @@
                      process_error_callback));
 }
 
-void NodeController::ClosePeerConnection(const std::string& peer_token) {
-  io_task_runner_->PostTask(
-      FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
-                            base::Unretained(this), peer_token));
-}
-
 void NodeController::AcceptBrokerClientInvitation(
     ConnectionParams connection_params) {
   DCHECK(!GetConfiguration().is_broker_process);
@@ -239,16 +233,24 @@
                      base::Unretained(this), std::move(connection_params)));
 }
 
-void NodeController::ConnectToPeer(ConnectionParams connection_params,
-                                   const ports::PortRef& port,
-                                   const std::string& peer_token) {
-  ports::NodeName node_name;
-  GenerateRandomName(&node_name);
+uint64_t NodeController::ConnectToPeer(ConnectionParams connection_params,
+                                       const ports::PortRef& port) {
+  uint64_t id = 0;
+  {
+    base::AutoLock lock(peers_lock_);
+    id = next_peer_connection_id_++;
+  }
+  io_task_runner_->PostTask(FROM_HERE,
+                            base::Bind(&NodeController::ConnectToPeerOnIOThread,
+                                       base::Unretained(this), id,
+                                       base::Passed(&connection_params), port));
+  return id;
+}
+
+void NodeController::ClosePeerConnection(uint64_t peer_connection_id) {
   io_task_runner_->PostTask(
-      FROM_HERE,
-      base::Bind(&NodeController::ConnectToPeerOnIOThread,
-                 base::Unretained(this), base::Passed(&connection_params),
-                 node_name, port, peer_token));
+      FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
+                            base::Unretained(this), peer_connection_id));
 }
 
 void NodeController::SetPortObserver(const ports::PortRef& port,
@@ -414,17 +416,19 @@
   bootstrap_parent_channel_->Start();
 }
 
-void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params,
-                                             ports::NodeName token,
-                                             ports::PortRef port,
-                                             const std::string& peer_token) {
+void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id,
+                                             ConnectionParams connection_params,
+                                             ports::PortRef port) {
   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
 
   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
       this, std::move(connection_params), io_task_runner_, {});
-  peer_connections_.insert(
-      {token, PeerConnection{channel, port, peer_token}});
-  peers_by_token_.insert({peer_token, token});
+
+  ports::NodeName token;
+  GenerateRandomName(&token);
+  peer_connections_.emplace(token,
+                            PeerConnection{channel, port, peer_connection_id});
+  peer_connections_by_id_.emplace(peer_connection_id, token);
 
   channel->SetRemoteNodeName(token);
   channel->Start();
@@ -433,11 +437,11 @@
 }
 
 void NodeController::ClosePeerConnectionOnIOThread(
-    const std::string& peer_token) {
+    uint64_t peer_connection_id) {
   RequestContext request_context(RequestContext::Source::SYSTEM);
-  auto peer = peers_by_token_.find(peer_token);
+  auto peer = peer_connections_by_id_.find(peer_connection_id);
   // The connection may already be closed.
-  if (peer == peers_by_token_.end())
+  if (peer == peer_connections_by_id_.end())
     return;
 
   // |peer| may be removed so make a copy of |name|.
@@ -567,7 +571,7 @@
 
   auto peer = peer_connections_.find(name);
   if (peer != peer_connections_.end()) {
-    peers_by_token_.erase(peer->second.peer_token);
+    peer_connections_by_id_.erase(peer->second.connection_id);
     ports_to_close.push_back(peer->second.local_port);
     peer_connections_.erase(peer);
   }
@@ -1298,18 +1302,18 @@
 
   scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
   ports::PortRef local_port = it->second.local_port;
-  std::string peer_token = std::move(it->second.peer_token);
+  uint64_t peer_connection_id = it->second.connection_id;
   peer_connections_.erase(it);
   DCHECK(channel);
 
-  // If the peer connection is a self connection (which is used in tests),
-  // drop the channel to it and skip straight to merging the ports.
   if (name_ == peer_name) {
-    peers_by_token_.erase(peer_token);
+    // If the peer connection is a self connection (which is used in tests),
+    // drop the channel to it and skip straight to merging the ports.
+    peer_connections_by_id_.erase(peer_connection_id);
   } else {
-    peers_by_token_[peer_token] = peer_name;
-    peer_connections_.insert(
-        {peer_name, PeerConnection{nullptr, local_port, peer_token}});
+    peer_connections_by_id_[peer_connection_id] = peer_name;
+    peer_connections_.emplace(
+        peer_name, PeerConnection{nullptr, local_port, peer_connection_id});
     DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
 
     AddPeer(peer_name, channel, false /* start_channel */);
@@ -1318,9 +1322,8 @@
   // We need to choose one side to initiate the port merge. It doesn't matter
   // who does it as long as they don't both try. Simple solution: pick the one
   // with the "smaller" port name.
-  if (local_port.name() < port_name) {
+  if (local_port.name() < port_name)
     node()->MergePorts(local_port, peer_name, port_name);
-  }
 }
 
 void NodeController::OnChannelError(const ports::NodeName& from_node,
@@ -1407,10 +1410,10 @@
 NodeController::PeerConnection::PeerConnection(
     scoped_refptr<NodeChannel> channel,
     const ports::PortRef& local_port,
-    const std::string& peer_token)
+    uint64_t connection_id)
     : channel(std::move(channel)),
       local_port(local_port),
-      peer_token(peer_token) {}
+      connection_id(connection_id) {}
 
 NodeController::PeerConnection::~PeerConnection() = default;
 
diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h
index 818f1ec..f5eed33 100644
--- a/mojo/edk/system/node_controller.h
+++ b/mojo/edk/system/node_controller.h
@@ -81,17 +81,17 @@
       const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
       const ProcessErrorCallback& process_error_callback);
 
-  // Close a connection to a peer associated with |peer_token|.
-  void ClosePeerConnection(const std::string& peer_token);
-
   // Connects this node to the process which invited it to be a broker client.
   void AcceptBrokerClientInvitation(ConnectionParams connection_params);
 
   // Connects this node to a peer node. On success, |port| will be merged with
-  // the corresponding port in the peer node.
-  void ConnectToPeer(ConnectionParams connection_params,
-                     const ports::PortRef& port,
-                     const std::string& peer_token);
+  // the corresponding port in the peer node. Returns an ID that can be used to
+  // later close the connection with a call to ClosePeerConnection().
+  uint64_t ConnectToPeer(ConnectionParams connection_params,
+                         const ports::PortRef& port);
+
+  // Close a connection to a peer associated with |peer_connection_id|.
+  void ClosePeerConnection(uint64_t peer_connection_id);
 
   // Sets a port's observer. If |observer| is null the port's current observer
   // is removed.
@@ -143,16 +143,16 @@
     PeerConnection(PeerConnection&& other);
     PeerConnection(scoped_refptr<NodeChannel> channel,
                    const ports::PortRef& local_port,
-                   const std::string& peer_token);
+                   uint64_t connection_id);
     ~PeerConnection();
 
     PeerConnection& operator=(const PeerConnection& other);
     PeerConnection& operator=(PeerConnection&& other);
 
-
+    // NOTE: |channel| is null once the connection is fully established.
     scoped_refptr<NodeChannel> channel;
     ports::PortRef local_port;
-    std::string peer_token;
+    uint64_t connection_id;
   };
 
   void SendBrokerClientInvitationOnIOThread(
@@ -163,11 +163,10 @@
   void AcceptBrokerClientInvitationOnIOThread(
       ConnectionParams connection_params);
 
-  void ConnectToPeerOnIOThread(ConnectionParams connection_params,
-                               ports::NodeName token,
-                               ports::PortRef port,
-                               const std::string& peer_token);
-  void ClosePeerConnectionOnIOThread(const std::string& node_name);
+  void ConnectToPeerOnIOThread(uint64_t peer_connection_id,
+                               ConnectionParams connection_params,
+                               ports::PortRef port);
+  void ClosePeerConnectionOnIOThread(uint64_t peer_connection_id);
 
   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
   scoped_refptr<NodeChannel> GetParentChannel();
@@ -260,12 +259,15 @@
   const std::unique_ptr<ports::Node> node_;
   scoped_refptr<base::TaskRunner> io_task_runner_;
 
-  // Guards |peers_| and |pending_peer_messages_|.
+  // Guards |peers_|, |pending_peer_messages_|, and |next_peer_connection_id_|.
   base::Lock peers_lock_;
 
   // Channels to known peers, including parent and children, if any.
   NodeMap peers_;
 
+  // A unique ID generator for peer connections.
+  uint64_t next_peer_connection_id_ = 1;
+
   // Outgoing message queues for peers we've heard of but can't yet talk to.
   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
       pending_peer_messages_;
@@ -334,12 +336,10 @@
   // Channels to invitees during handshake.
   NodeMap pending_invitations_;
 
-  using PeerNodeMap =
-      std::unordered_map<ports::NodeName, PeerConnection>;
-  PeerNodeMap peer_connections_;
+  std::map<ports::NodeName, PeerConnection> peer_connections_;
 
   // Maps from peer token to node name, pending or not.
-  std::unordered_map<std::string, ports::NodeName> peers_by_token_;
+  std::unordered_map<uint64_t, ports::NodeName> peer_connections_by_id_;
 
   // Indicates whether this object should delete itself on IO thread shutdown.
   // Must only be accessed from the IO thread.
diff --git a/mojo/edk/test/multiprocess_test_helper.cc b/mojo/edk/test/multiprocess_test_helper.cc
index d3c22a9..049dc6e 100644
--- a/mojo/edk/test/multiprocess_test_helper.cc
+++ b/mojo/edk/test/multiprocess_test_helper.cc
@@ -13,6 +13,7 @@
 #include "base/bind.h"
 #include "base/command_line.h"
 #include "base/files/file_path.h"
+#include "base/lazy_instance.h"
 #include "base/logging.h"
 #include "base/memory/ref_counted.h"
 #include "base/path_service.h"
@@ -28,6 +29,7 @@
 #include "mojo/edk/embedder/named_platform_handle.h"
 #include "mojo/edk/embedder/named_platform_handle_utils.h"
 #include "mojo/edk/embedder/outgoing_broker_client_invitation.h"
+#include "mojo/edk/embedder/peer_connection.h"
 #include "mojo/edk/embedder/platform_channel_pair.h"
 #include "testing/gtest/include/gtest/gtest.h"
 
@@ -48,6 +50,9 @@
 
 const char kTestChildMessagePipeName[] = "test_pipe";
 
+// For use (and only valid) in a test child process:
+base::LazyInstance<PeerConnection>::Leaky g_child_peer_connection;
+
 template <typename Func>
 int RunClientFunction(Func handler, bool pass_pipe_ownership_to_main) {
   CHECK(MultiprocessTestHelper::primordial_pipe.is_valid());
@@ -160,8 +165,9 @@
     command_line.AppendSwitch(kRunAsBrokerClient);
   } else if (launch_type == LaunchType::PEER ||
              launch_type == LaunchType::NAMED_PEER) {
-    peer_token_ = mojo::edk::GenerateRandomToken();
-    pipe = ConnectToPeerProcess(std::move(server_handle), peer_token_);
+    peer_connection_ = base::MakeUnique<PeerConnection>();
+    pipe = peer_connection_->Connect(
+        ConnectionParams(TransportProtocol::kLegacy, std::move(server_handle)));
   }
 
   test_child_ =
@@ -193,9 +199,8 @@
 }
 
 void MultiprocessTestHelper::ClosePeerConnection() {
-  DCHECK(!peer_token_.empty());
-  ::mojo::edk::ClosePeerConnection(peer_token_);
-  peer_token_.clear();
+  DCHECK(peer_connection_);
+  peer_connection_.reset();
 }
 
 bool MultiprocessTestHelper::WaitForChildTestShutdown() {
@@ -224,11 +229,13 @@
     primordial_pipe = invitation->ExtractMessagePipe(kTestChildMessagePipeName);
   } else {
     if (named_pipe.is_valid()) {
-      primordial_pipe = ConnectToPeerProcess(CreateClientHandle(named_pipe));
+      primordial_pipe = g_child_peer_connection.Get().Connect(ConnectionParams(
+          TransportProtocol::kLegacy, CreateClientHandle(named_pipe)));
     } else {
-      primordial_pipe = ConnectToPeerProcess(
+      primordial_pipe = g_child_peer_connection.Get().Connect(ConnectionParams(
+          TransportProtocol::kLegacy,
           PlatformChannelPair::PassClientHandleFromParentProcess(
-              *base::CommandLine::ForCurrentProcess()));
+              *base::CommandLine::ForCurrentProcess())));
     }
   }
 }
diff --git a/mojo/edk/test/multiprocess_test_helper.h b/mojo/edk/test/multiprocess_test_helper.h
index dc1c9bc..f070890 100644
--- a/mojo/edk/test/multiprocess_test_helper.h
+++ b/mojo/edk/test/multiprocess_test_helper.h
@@ -20,6 +20,8 @@
 
 namespace edk {
 
+class PeerConnection;
+
 namespace test {
 
 class MultiprocessTestHelper {
@@ -98,7 +100,7 @@
 
   ProcessErrorCallback process_error_callback_;
 
-  std::string peer_token_;
+  std::unique_ptr<PeerConnection> peer_connection_;
 
   DISALLOW_COPY_AND_ASSIGN(MultiprocessTestHelper);
 };