[PeerConnection] Implement asynchronous version of AddIceCandidate().

This is the same as the existing version, except it uses the Operations
Chain. As such, if an asynchronous operation that uses the chain is
currently pending, such as CreateOffer() or CreateAnswer(),
AddIceCandidate() will not happen until the previous operation
completes.

Bug: chromium:1019222
Change-Id: Ie6e5fc386fa9c29b5e2f8e3f65bfbaf9837d351c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158741
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29704}
diff --git a/api/peer_connection_interface.h b/api/peer_connection_interface.h
index 7567ab1..52422c0 100644
--- a/api/peer_connection_interface.h
+++ b/api/peer_connection_interface.h
@@ -1039,7 +1039,14 @@
   // A copy of the |candidate| will be created and added to the remote
   // description. So the caller of this method still has the ownership of the
   // |candidate|.
+  // TODO(hbos): The spec mandates chaining this operation onto the operations
+  // chain; deprecate and remove this version in favor of the callback-based
+  // signature.
   virtual bool AddIceCandidate(const IceCandidateInterface* candidate) = 0;
+  // TODO(hbos): Remove default implementation once implemented by downstream
+  // projects.
+  virtual void AddIceCandidate(std::unique_ptr<IceCandidateInterface> candidate,
+                               std::function<void(RTCError)> callback) {}
 
   // Removes a group of remote candidates from the ICE agent. Needed mainly for
   // continual gathering, to avoid an ever-growing list of candidates as
diff --git a/api/peer_connection_proxy.h b/api/peer_connection_proxy.h
index 3b9cf79..1b4ceea 100644
--- a/api/peer_connection_proxy.h
+++ b/api/peer_connection_proxy.h
@@ -114,6 +114,10 @@
               SetConfiguration,
               const PeerConnectionInterface::RTCConfiguration&)
 PROXY_METHOD1(bool, AddIceCandidate, const IceCandidateInterface*)
+PROXY_METHOD2(void,
+              AddIceCandidate,
+              std::unique_ptr<IceCandidateInterface>,
+              std::function<void(RTCError)>)
 PROXY_METHOD1(bool, RemoveIceCandidates, const std::vector<cricket::Candidate>&)
 PROXY_METHOD1(RTCError, SetBitrate, const BitrateSettings&)
 PROXY_METHOD1(void, SetAudioPlayout, bool)
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 46a61ab..d09b9c5 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -4209,6 +4209,37 @@
   }
 }
 
+void PeerConnection::AddIceCandidate(
+    std::unique_ptr<IceCandidateInterface> candidate,
+    std::function<void(RTCError)> callback) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  // Chain this operation. If asynchronous operations are pending on the chain,
+  // this operation will be queued to be invoked, otherwise the contents of the
+  // lambda will execute immediately.
+  operations_chain_->ChainOperation(
+      [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
+       candidate = std::move(candidate), callback = std::move(callback)](
+          std::function<void()> operations_chain_callback) {
+        if (!this_weak_ptr) {
+          operations_chain_callback();
+          callback(RTCError(
+              RTCErrorType::INVALID_STATE,
+              "AddIceCandidate failed because the session was shut down"));
+          return;
+        }
+        if (!this_weak_ptr->AddIceCandidate(candidate.get())) {
+          operations_chain_callback();
+          // Fail with an error type and message consistent with Chromium.
+          // TODO(hbos): Fail with error types according to spec.
+          callback(RTCError(RTCErrorType::UNSUPPORTED_OPERATION,
+                            "Error processing ICE candidate"));
+          return;
+        }
+        operations_chain_callback();
+        callback(RTCError::OK());
+      });
+}
+
 bool PeerConnection::RemoveIceCandidates(
     const std::vector<cricket::Candidate>& candidates) {
   TRACE_EVENT0("webrtc", "PeerConnection::RemoveIceCandidates");
diff --git a/pc/peer_connection.h b/pc/peer_connection.h
index dea05ac..302ff3b 100644
--- a/pc/peer_connection.h
+++ b/pc/peer_connection.h
@@ -221,6 +221,8 @@
   RTCError SetConfiguration(
       const PeerConnectionInterface::RTCConfiguration& configuration) override;
   bool AddIceCandidate(const IceCandidateInterface* candidate) override;
+  void AddIceCandidate(std::unique_ptr<IceCandidateInterface> candidate,
+                       std::function<void(RTCError)> callback) override;
   bool RemoveIceCandidates(
       const std::vector<cricket::Candidate>& candidates) override;
 
diff --git a/pc/peer_connection_ice_unittest.cc b/pc/peer_connection_ice_unittest.cc
index 61034d0..18a053c 100644
--- a/pc/peer_connection_ice_unittest.cc
+++ b/pc/peer_connection_ice_unittest.cc
@@ -28,6 +28,7 @@
 #include "api/video_codecs/builtin_video_decoder_factory.h"
 #include "api/video_codecs/builtin_video_encoder_factory.h"
 #include "pc/test/fake_audio_capture_module.h"
+#include "pc/test/mock_peer_connection_observers.h"
 #include "rtc_base/fake_network.h"
 #include "rtc_base/gunit.h"
 #include "rtc_base/strings/string_builder.h"
@@ -46,21 +47,26 @@
 using ::testing::Values;
 
 constexpr int kIceCandidatesTimeout = 10000;
+constexpr int64_t kWaitTimeout = 10000;
 
 class PeerConnectionWrapperForIceTest : public PeerConnectionWrapper {
  public:
   using PeerConnectionWrapper::PeerConnectionWrapper;
 
-  // Adds a new ICE candidate to the first transport.
-  bool AddIceCandidate(cricket::Candidate* candidate) {
+  std::unique_ptr<IceCandidateInterface> CreateJsepCandidateForFirstTransport(
+      cricket::Candidate* candidate) {
     RTC_DCHECK(pc()->remote_description());
     const auto* desc = pc()->remote_description()->description();
     RTC_DCHECK(desc->contents().size() > 0);
     const auto& first_content = desc->contents()[0];
     candidate->set_transport_name(first_content.name);
-    std::unique_ptr<IceCandidateInterface> jsep_candidate =
-        CreateIceCandidate(first_content.name, -1, *candidate);
-    return pc()->AddIceCandidate(jsep_candidate.get());
+    return CreateIceCandidate(first_content.name, -1, *candidate);
+  }
+
+  // Adds a new ICE candidate to the first transport.
+  bool AddIceCandidate(cricket::Candidate* candidate) {
+    return pc()->AddIceCandidate(
+        CreateJsepCandidateForFirstTransport(candidate).get());
   }
 
   // Returns ICE candidates from the remote session description.
@@ -691,6 +697,130 @@
                       candidates[1]->candidate());
 }
 
+TEST_P(PeerConnectionIceTest, AsyncAddIceCandidateIsAddedToRemoteDescription) {
+  auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111));
+
+  auto caller = CreatePeerConnectionWithAudioVideo();
+  auto callee = CreatePeerConnectionWithAudioVideo();
+
+  ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal()));
+
+  auto jsep_candidate =
+      callee->CreateJsepCandidateForFirstTransport(&candidate);
+  bool operation_completed = false;
+  callee->pc()->AddIceCandidate(std::move(jsep_candidate),
+                                [&operation_completed](RTCError result) {
+                                  EXPECT_TRUE(result.ok());
+                                  operation_completed = true;
+                                });
+  EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout);
+
+  auto candidates = callee->GetIceCandidatesFromRemoteDescription();
+  ASSERT_EQ(1u, candidates.size());
+  EXPECT_PRED_FORMAT2(AssertCandidatesEqual, candidate,
+                      candidates[0]->candidate());
+}
+
+TEST_P(PeerConnectionIceTest,
+       AsyncAddIceCandidateCompletesImmediatelyIfNoPendingOperation) {
+  auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111));
+
+  auto caller = CreatePeerConnectionWithAudioVideo();
+  auto callee = CreatePeerConnectionWithAudioVideo();
+
+  ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal()));
+
+  auto jsep_candidate =
+      callee->CreateJsepCandidateForFirstTransport(&candidate);
+  bool operation_completed = false;
+  callee->pc()->AddIceCandidate(
+      std::move(jsep_candidate),
+      [&operation_completed](RTCError result) { operation_completed = true; });
+  EXPECT_TRUE(operation_completed);
+}
+
+TEST_P(PeerConnectionIceTest,
+       AsyncAddIceCandidateCompletesWhenPendingOperationCompletes) {
+  auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111));
+
+  auto caller = CreatePeerConnectionWithAudioVideo();
+  auto callee = CreatePeerConnectionWithAudioVideo();
+
+  ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal()));
+
+  // Chain an operation that will block AddIceCandidate() from executing.
+  rtc::scoped_refptr<MockCreateSessionDescriptionObserver> answer_observer(
+      new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>());
+  callee->pc()->CreateAnswer(answer_observer, RTCOfferAnswerOptions());
+
+  auto jsep_candidate =
+      callee->CreateJsepCandidateForFirstTransport(&candidate);
+  bool operation_completed = false;
+  callee->pc()->AddIceCandidate(
+      std::move(jsep_candidate),
+      [&operation_completed](RTCError result) { operation_completed = true; });
+  // The operation will not be able to complete until we EXPECT_TRUE_WAIT()
+  // allowing CreateAnswer() to complete.
+  EXPECT_FALSE(operation_completed);
+  EXPECT_TRUE_WAIT(answer_observer->called(), kWaitTimeout);
+  // As soon as it does, AddIceCandidate() will execute without delay, so it
+  // must also have completed.
+  EXPECT_TRUE(operation_completed);
+}
+
+TEST_P(PeerConnectionIceTest,
+       AsyncAddIceCandidateFailsBeforeSetRemoteDescription) {
+  auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111));
+
+  auto caller = CreatePeerConnectionWithAudioVideo();
+  std::unique_ptr<IceCandidateInterface> jsep_candidate =
+      CreateIceCandidate(cricket::CN_AUDIO, 0, candidate);
+
+  bool operation_completed = false;
+  caller->pc()->AddIceCandidate(
+      std::move(jsep_candidate), [&operation_completed](RTCError result) {
+        EXPECT_FALSE(result.ok());
+        EXPECT_EQ(result.message(),
+                  std::string("Error processing ICE candidate"));
+        operation_completed = true;
+      });
+  EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout);
+}
+
+TEST_P(PeerConnectionIceTest,
+       AsyncAddIceCandidateFailsIfPeerConnectionDestroyed) {
+  auto candidate = CreateLocalUdpCandidate(SocketAddress("1.1.1.1", 1111));
+
+  auto caller = CreatePeerConnectionWithAudioVideo();
+  auto callee = CreatePeerConnectionWithAudioVideo();
+
+  ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal()));
+
+  // Chain an operation that will block AddIceCandidate() from executing.
+  rtc::scoped_refptr<MockCreateSessionDescriptionObserver> answer_observer(
+      new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>());
+  callee->pc()->CreateAnswer(answer_observer, RTCOfferAnswerOptions());
+
+  auto jsep_candidate =
+      callee->CreateJsepCandidateForFirstTransport(&candidate);
+  bool operation_completed = false;
+  callee->pc()->AddIceCandidate(
+      std::move(jsep_candidate), [&operation_completed](RTCError result) {
+        EXPECT_FALSE(result.ok());
+        EXPECT_EQ(
+            result.message(),
+            std::string(
+                "AddIceCandidate failed because the session was shut down"));
+        operation_completed = true;
+      });
+  // The operation will not be able to run until EXPECT_TRUE_WAIT(), giving us
+  // time to remove all references to the PeerConnection.
+  EXPECT_FALSE(operation_completed);
+  // This should delete the callee PC.
+  callee = nullptr;
+  EXPECT_TRUE_WAIT(operation_completed, kWaitTimeout);
+}
+
 TEST_P(PeerConnectionIceTest, LocalDescriptionUpdatedWhenContinualGathering) {
   const SocketAddress kLocalAddress("1.1.1.1", 0);
 
diff --git a/rtc_base/operations_chain.h b/rtc_base/operations_chain.h
index 94ff57b..b6ec46e 100644
--- a/rtc_base/operations_chain.h
+++ b/rtc_base/operations_chain.h
@@ -56,7 +56,14 @@
 #ifdef RTC_DCHECK_IS_ON
     has_run_ = true;
 #endif  // RTC_DCHECK_IS_ON
-    functor_(std::move(callback_));
+    // The functor being executed may invoke the callback synchronously,
+    // marking the operation as complete. As such, |this| OperationWithFunctor
+    // object may get deleted here, including destroying |functor_|. To
+    // protect the functor from self-destruction while running, it is moved to
+    // a local variable.
+    auto functor = std::move(functor_);
+    functor(std::move(callback_));
+    // |this| may now be deleted; don't touch any member variables.
   }
 
  private:
diff --git a/rtc_base/operations_chain_unittest.cc b/rtc_base/operations_chain_unittest.cc
index 8dbe607..968f94c 100644
--- a/rtc_base/operations_chain_unittest.cc
+++ b/rtc_base/operations_chain_unittest.cc
@@ -172,6 +172,38 @@
   scoped_refptr<OperationsChain> operations_chain_;
 };
 
+// On destruction, sets a boolean flag to true.
+class SignalOnDestruction final {
+ public:
+  SignalOnDestruction(bool* destructor_called)
+      : destructor_called_(destructor_called) {
+    RTC_DCHECK(destructor_called_);
+  }
+  ~SignalOnDestruction() {
+    // Moved objects will have |destructor_called_| set to null. Destroying a
+    // moved SignalOnDestruction should not signal.
+    if (destructor_called_) {
+      *destructor_called_ = true;
+    }
+  }
+
+  // Move operators.
+  SignalOnDestruction(SignalOnDestruction&& other)
+      : SignalOnDestruction(other.destructor_called_) {
+    other.destructor_called_ = nullptr;
+  }
+  SignalOnDestruction& operator=(SignalOnDestruction&& other) {
+    destructor_called_ = other.destructor_called_;
+    other.destructor_called_ = nullptr;
+    return *this;
+  }
+
+ private:
+  bool* destructor_called_;
+
+  RTC_DISALLOW_COPY_AND_ASSIGN(SignalOnDestruction);
+};
+
 TEST(OperationsChainTest, SynchronousOperation) {
   OperationTrackerProxy operation_tracker_proxy;
   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
@@ -312,6 +344,29 @@
   async_operation_completed_event->Wait(Event::kForever);
 }
 
+TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
+  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
+
+  bool destructor_called = false;
+  SignalOnDestruction signal_on_destruction(&destructor_called);
+
+  operations_chain->ChainOperation(
+      [signal_on_destruction = std::move(signal_on_destruction),
+       &destructor_called](std::function<void()> callback) {
+        EXPECT_FALSE(destructor_called);
+        // Invoking the callback marks the operation as complete, popping the
+        // Operation object from the OperationsChain internal queue.
+        callback();
+        // Even though the internal Operation object has been destroyed,
+        // variables captured by this lambda expression must still be valid (the
+        // associated functor must not be deleted while executing).
+        EXPECT_FALSE(destructor_called);
+      });
+  // The lambda having executed synchronously and completed, its captured
+  // variables should now have been deleted.
+  EXPECT_TRUE(destructor_called);
+}
+
 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
 
 TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) {