Makes Thread::Send execute sent messages after pending posted messages.

Bug: webrtc:11255
Change-Id: I4b9036d22c9db3a5ec0e19fc5f2f5ac0d7e2289a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168058
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Ali Tofigh <alito@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30667}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index d23cf13..d8eb6b5 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -762,6 +762,7 @@
     "network:sent_packet",
     "system:file_wrapper",
     "system:rtc_export",
+    "task_utils:to_queued_task",
     "third_party/base64",
     "third_party/sigslot",
     "//third_party/abseil-cpp/absl/algorithm:container",
@@ -1333,6 +1334,7 @@
       "../api/task_queue",
       "../api/task_queue:task_queue_test",
       "../test:fileutils",
+      "../test:rtc_expect_death",
       "../test:test_main",
       "../test:test_support",
       "memory:fifo_buffer",
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 00a582c..0fb2e81 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -34,6 +34,7 @@
 #include "rtc_base/critical_section.h"
 #include "rtc_base/logging.h"
 #include "rtc_base/null_socket_server.h"
+#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/time_utils.h"
 #include "rtc_base/trace_event.h"
 
@@ -142,9 +143,44 @@
     if (iter != message_queues_.end()) {
       message_queues_.erase(iter);
     }
+#if RTC_DCHECK_IS_ON
+    RemoveFromSendGraph(message_queue);
+#endif
   }
 }
 
+#if RTC_DCHECK_IS_ON
+void ThreadManager::RemoveFromSendGraph(Thread* thread) {
+  for (auto it = send_graph_.begin(); it != send_graph_.end();) {
+    if (it->first == thread) {
+      it = send_graph_.erase(it);
+    } else {
+      it->second.erase(thread);
+      ++it;
+    }
+  }
+}
+
+void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
+                                                  Thread* target) {
+  CritScope cs(&crit_);
+  std::deque<Thread*> all_targets({target});
+  // We check the pre-existing who-sends-to-who graph for any path from target
+  // to source. This loop is guaranteed to terminate because per the send graph
+  // invariant, there are no cycles in the graph.
+  for (auto it = all_targets.begin(); it != all_targets.end(); ++it) {
+    const auto& targets = send_graph_[*it];
+    all_targets.insert(all_targets.end(), targets.begin(), targets.end());
+  }
+  RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
+      << " send loop between " << source->name() << " and " << target->name();
+
+  // We may now insert source -> target without creating a cycle, since there
+  // was no path from target to source per the prior CHECK.
+  send_graph_[source].insert(target);
+}
+#endif
+
 // static
 void ThreadManager::Clear(MessageHandler* handler) {
   return Instance()->ClearInternal(handler);
@@ -404,9 +440,6 @@
   int64_t msStart = TimeMillis();
   int64_t msCurrent = msStart;
   while (true) {
-    // Check for sent messages
-    ReceiveSendsFromThread(nullptr);
-
     // Check for posted events
     int64_t cmsDelayNext = kForever;
     bool first_pass = true;
@@ -836,7 +869,7 @@
   msg.message_id = id;
   msg.pdata = pdata;
   if (IsCurrent()) {
-    phandler->OnMessage(&msg);
+    msg.phandler->OnMessage(&msg);
     return;
   }
 
@@ -845,27 +878,23 @@
   AutoThread thread;
   Thread* current_thread = Thread::Current();
   RTC_DCHECK(current_thread != nullptr);  // AutoThread ensures this
-
+#if RTC_DCHECK_IS_ON
+  ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
+                                                           this);
+#endif
   bool ready = false;
-  {
-    CritScope cs(&crit_);
-    _SendMessage smsg;
-    smsg.thread = current_thread;
-    smsg.msg = msg;
-    smsg.ready = &ready;
-    sendlist_.push_back(smsg);
-  }
-
-  // Wait for a reply
-  WakeUpSocketServer();
+  PostTask(
+      webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); },
+                           [this, &ready, current_thread] {
+                             CritScope cs(&crit_);
+                             ready = true;
+                             current_thread->socketserver()->WakeUp();
+                           }));
 
   bool waited = false;
   crit_.Enter();
   while (!ready) {
     crit_.Leave();
-    // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
-    // thread invoking calls on the current thread.
-    current_thread->ReceiveSendsFromThread(this);
     current_thread->socketserver()->Wait(kForever, false);
     waited = true;
     crit_.Enter();
@@ -888,38 +917,6 @@
   }
 }
 
-void Thread::ReceiveSendsFromThread(const Thread* source) {
-  // Receive a sent message. Cleanup scenarios:
-  // - thread sending exits: We don't allow this, since thread can exit
-  //   only via Join, so Send must complete.
-  // - thread receiving exits: Wakeup/set ready in Thread::Clear()
-  // - object target cleared: Wakeup/set ready in Thread::Clear()
-  _SendMessage smsg;
-
-  crit_.Enter();
-  while (PopSendMessageFromThread(source, &smsg)) {
-    crit_.Leave();
-
-    Dispatch(&smsg.msg);
-
-    crit_.Enter();
-    *smsg.ready = true;
-    smsg.thread->socketserver()->WakeUp();
-  }
-  crit_.Leave();
-}
-
-bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
-  for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) {
-    if (it->thread == source || source == nullptr) {
-      *msg = *it;
-      sendlist_.erase(it);
-      return true;
-    }
-  }
-  return false;
-}
-
 void Thread::InvokeInternal(const Location& posted_from,
                             rtc::FunctionView<void()> functor) {
   TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
@@ -981,26 +978,6 @@
                    uint32_t id,
                    MessageList* removed) {
   CritScope cs(&crit_);
-
-  // Remove messages on sendlist_ with phandler
-  // Object target cleared: remove from send list, wakeup/set ready
-  // if sender not null.
-  for (auto iter = sendlist_.begin(); iter != sendlist_.end();) {
-    _SendMessage smsg = *iter;
-    if (smsg.msg.Match(phandler, id)) {
-      if (removed) {
-        removed->push_back(smsg.msg);
-      } else {
-        delete smsg.msg.pdata;
-      }
-      iter = sendlist_.erase(iter);
-      *smsg.ready = true;
-      smsg.thread->socketserver()->WakeUp();
-      continue;
-    }
-    ++iter;
-  }
-
   ClearInternal(phandler, id, removed);
 }
 
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index d08c3bd..74aab62 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -14,8 +14,10 @@
 #include <stdint.h>
 
 #include <list>
+#include <map>
 #include <memory>
 #include <queue>
+#include <set>
 #include <string>
 #include <type_traits>
 #include <vector>
@@ -112,6 +114,13 @@
 
   bool IsMainThread();
 
+#if RTC_DCHECK_IS_ON
+  // Registers that a Send operation is to be performed between |source| and
+  // |target|, while checking that this does not cause a send cycle that could
+  // potentially cause a deadlock.
+  void RegisterSendAndCheckForCycles(Thread* source, Thread* target);
+#endif
+
  private:
   ThreadManager();
   ~ThreadManager();
@@ -121,6 +130,9 @@
   void RemoveInternal(Thread* message_queue);
   void ClearInternal(MessageHandler* handler);
   void ProcessAllMessageQueuesInternal();
+#if RTC_DCHECK_IS_ON
+  void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+#endif
 
   // This list contains all live Threads.
   std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
@@ -130,6 +142,12 @@
   // calls.
   CriticalSection crit_;
   size_t processing_ RTC_GUARDED_BY(crit_) = 0;
+#if RTC_DCHECK_IS_ON
+  // Represents all thread seand actions by storing all send targets per thread.
+  // This is used by RegisterSendAndCheckForCycles. This graph has no cycles
+  // since we will trigger a CHECK failure if a cycle is introduced.
+  std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_);
+#endif
 
 #if defined(WEBRTC_POSIX)
   pthread_key_t key_;
@@ -145,13 +163,6 @@
   RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
 };
 
-struct _SendMessage {
-  _SendMessage() {}
-  Thread* thread;
-  Message msg;
-  bool* ready;
-};
-
 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
 
 class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
@@ -537,16 +548,6 @@
   // Return true if the thread is currently running.
   bool IsRunning();
 
-  // Processes received "Send" requests. If |source| is not null, only requests
-  // from |source| are processed, otherwise, all requests are processed.
-  void ReceiveSendsFromThread(const Thread* source);
-
-  // If |source| is not null, pops the first "Send" message from |source| in
-  // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
-  // The caller must lock |crit_| before calling.
-  // Returns true if there is such a message.
-  bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
-
   void InvokeInternal(const Location& posted_from,
                       rtc::FunctionView<void()> functor);
 
@@ -570,7 +571,6 @@
   // Used if SocketServer ownership lies with |this|.
   std::unique_ptr<SocketServer> own_ss_;
 
-  std::list<_SendMessage> sendlist_;
   std::string name_;
 
   // TODO(tommi): Add thread checks for proper use of control methods.
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index fb54bb5..91bea4f 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -24,6 +24,7 @@
 #include "rtc_base/socket_address.h"
 #include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
+#include "test/testsupport/rtc_expect_death.h"
 
 #if defined(WEBRTC_WIN)
 #include <comdef.h>  // NOLINT
@@ -307,29 +308,38 @@
 }
 
 // Verifies that two threads calling Invoke on each other at the same time does
-// not deadlock.
-TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
+// not deadlock but crash.
+#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
+TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
+  ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
   AutoThread thread;
-  Thread* current_thread = Thread::Current();
-  ASSERT_TRUE(current_thread != nullptr);
-
+  Thread* main_thread = Thread::Current();
   auto other_thread = Thread::CreateWithSocketServer();
   other_thread->Start();
-
-  struct LocalFuncs {
-    static void Set(bool* out) { *out = true; }
-    static void InvokeSet(Thread* thread, bool* out) {
-      thread->Invoke<void>(RTC_FROM_HERE, Bind(&Set, out));
-    }
-  };
-
-  bool called = false;
-  other_thread->Invoke<void>(
-      RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called));
-
-  EXPECT_TRUE(called);
+  other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
+    RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
+  });
 }
 
+TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
+  ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
+  AutoThread thread;
+  Thread* first = Thread::Current();
+
+  auto second = Thread::Create();
+  second->Start();
+  auto third = Thread::Create();
+  third->Start();
+
+  second->Invoke<void>(RTC_FROM_HERE, [&] {
+    third->Invoke<void>(RTC_FROM_HERE, [&] {
+      RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
+    });
+  });
+}
+
+#endif
+
 // Verifies that if thread A invokes a call on thread B and thread C is trying
 // to invoke A at the same time, thread A does not handle C's invoke while
 // invoking B.