Makes Thread::Send execute sent messages after pending posted messages.
Bug: webrtc:11255
Change-Id: I4b9036d22c9db3a5ec0e19fc5f2f5ac0d7e2289a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/168058
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Ali Tofigh <alito@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30667}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index d23cf13..d8eb6b5 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -762,6 +762,7 @@
"network:sent_packet",
"system:file_wrapper",
"system:rtc_export",
+ "task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
"//third_party/abseil-cpp/absl/algorithm:container",
@@ -1333,6 +1334,7 @@
"../api/task_queue",
"../api/task_queue:task_queue_test",
"../test:fileutils",
+ "../test:rtc_expect_death",
"../test:test_main",
"../test:test_support",
"memory:fifo_buffer",
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 00a582c..0fb2e81 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -34,6 +34,7 @@
#include "rtc_base/critical_section.h"
#include "rtc_base/logging.h"
#include "rtc_base/null_socket_server.h"
+#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
@@ -142,9 +143,44 @@
if (iter != message_queues_.end()) {
message_queues_.erase(iter);
}
+#if RTC_DCHECK_IS_ON
+ RemoveFromSendGraph(message_queue);
+#endif
}
}
+#if RTC_DCHECK_IS_ON
+void ThreadManager::RemoveFromSendGraph(Thread* thread) {
+ for (auto it = send_graph_.begin(); it != send_graph_.end();) {
+ if (it->first == thread) {
+ it = send_graph_.erase(it);
+ } else {
+ it->second.erase(thread);
+ ++it;
+ }
+ }
+}
+
+void ThreadManager::RegisterSendAndCheckForCycles(Thread* source,
+ Thread* target) {
+ CritScope cs(&crit_);
+ std::deque<Thread*> all_targets({target});
+ // We check the pre-existing who-sends-to-who graph for any path from target
+ // to source. This loop is guaranteed to terminate because per the send graph
+ // invariant, there are no cycles in the graph.
+ for (auto it = all_targets.begin(); it != all_targets.end(); ++it) {
+ const auto& targets = send_graph_[*it];
+ all_targets.insert(all_targets.end(), targets.begin(), targets.end());
+ }
+ RTC_CHECK_EQ(absl::c_count(all_targets, source), 0)
+ << " send loop between " << source->name() << " and " << target->name();
+
+ // We may now insert source -> target without creating a cycle, since there
+ // was no path from target to source per the prior CHECK.
+ send_graph_[source].insert(target);
+}
+#endif
+
// static
void ThreadManager::Clear(MessageHandler* handler) {
return Instance()->ClearInternal(handler);
@@ -404,9 +440,6 @@
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
- // Check for sent messages
- ReceiveSendsFromThread(nullptr);
-
// Check for posted events
int64_t cmsDelayNext = kForever;
bool first_pass = true;
@@ -836,7 +869,7 @@
msg.message_id = id;
msg.pdata = pdata;
if (IsCurrent()) {
- phandler->OnMessage(&msg);
+ msg.phandler->OnMessage(&msg);
return;
}
@@ -845,27 +878,23 @@
AutoThread thread;
Thread* current_thread = Thread::Current();
RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this
-
+#if RTC_DCHECK_IS_ON
+ ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
+ this);
+#endif
bool ready = false;
- {
- CritScope cs(&crit_);
- _SendMessage smsg;
- smsg.thread = current_thread;
- smsg.msg = msg;
- smsg.ready = &ready;
- sendlist_.push_back(smsg);
- }
-
- // Wait for a reply
- WakeUpSocketServer();
+ PostTask(
+ webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); },
+ [this, &ready, current_thread] {
+ CritScope cs(&crit_);
+ ready = true;
+ current_thread->socketserver()->WakeUp();
+ }));
bool waited = false;
crit_.Enter();
while (!ready) {
crit_.Leave();
- // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
- // thread invoking calls on the current thread.
- current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
@@ -888,38 +917,6 @@
}
}
-void Thread::ReceiveSendsFromThread(const Thread* source) {
- // Receive a sent message. Cleanup scenarios:
- // - thread sending exits: We don't allow this, since thread can exit
- // only via Join, so Send must complete.
- // - thread receiving exits: Wakeup/set ready in Thread::Clear()
- // - object target cleared: Wakeup/set ready in Thread::Clear()
- _SendMessage smsg;
-
- crit_.Enter();
- while (PopSendMessageFromThread(source, &smsg)) {
- crit_.Leave();
-
- Dispatch(&smsg.msg);
-
- crit_.Enter();
- *smsg.ready = true;
- smsg.thread->socketserver()->WakeUp();
- }
- crit_.Leave();
-}
-
-bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
- for (auto it = sendlist_.begin(); it != sendlist_.end(); ++it) {
- if (it->thread == source || source == nullptr) {
- *msg = *it;
- sendlist_.erase(it);
- return true;
- }
- }
- return false;
-}
-
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
@@ -981,26 +978,6 @@
uint32_t id,
MessageList* removed) {
CritScope cs(&crit_);
-
- // Remove messages on sendlist_ with phandler
- // Object target cleared: remove from send list, wakeup/set ready
- // if sender not null.
- for (auto iter = sendlist_.begin(); iter != sendlist_.end();) {
- _SendMessage smsg = *iter;
- if (smsg.msg.Match(phandler, id)) {
- if (removed) {
- removed->push_back(smsg.msg);
- } else {
- delete smsg.msg.pdata;
- }
- iter = sendlist_.erase(iter);
- *smsg.ready = true;
- smsg.thread->socketserver()->WakeUp();
- continue;
- }
- ++iter;
- }
-
ClearInternal(phandler, id, removed);
}
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index d08c3bd..74aab62 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -14,8 +14,10 @@
#include <stdint.h>
#include <list>
+#include <map>
#include <memory>
#include <queue>
+#include <set>
#include <string>
#include <type_traits>
#include <vector>
@@ -112,6 +114,13 @@
bool IsMainThread();
+#if RTC_DCHECK_IS_ON
+ // Registers that a Send operation is to be performed between |source| and
+ // |target|, while checking that this does not cause a send cycle that could
+ // potentially cause a deadlock.
+ void RegisterSendAndCheckForCycles(Thread* source, Thread* target);
+#endif
+
private:
ThreadManager();
~ThreadManager();
@@ -121,6 +130,9 @@
void RemoveInternal(Thread* message_queue);
void ClearInternal(MessageHandler* handler);
void ProcessAllMessageQueuesInternal();
+#if RTC_DCHECK_IS_ON
+ void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+#endif
// This list contains all live Threads.
std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
@@ -130,6 +142,12 @@
// calls.
CriticalSection crit_;
size_t processing_ RTC_GUARDED_BY(crit_) = 0;
+#if RTC_DCHECK_IS_ON
+ // Represents all thread seand actions by storing all send targets per thread.
+ // This is used by RegisterSendAndCheckForCycles. This graph has no cycles
+ // since we will trigger a CHECK failure if a cycle is introduced.
+ std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_);
+#endif
#if defined(WEBRTC_POSIX)
pthread_key_t key_;
@@ -145,13 +163,6 @@
RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
};
-struct _SendMessage {
- _SendMessage() {}
- Thread* thread;
- Message msg;
- bool* ready;
-};
-
// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread().
class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
@@ -537,16 +548,6 @@
// Return true if the thread is currently running.
bool IsRunning();
- // Processes received "Send" requests. If |source| is not null, only requests
- // from |source| are processed, otherwise, all requests are processed.
- void ReceiveSendsFromThread(const Thread* source);
-
- // If |source| is not null, pops the first "Send" message from |source| in
- // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
- // The caller must lock |crit_| before calling.
- // Returns true if there is such a message.
- bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
-
void InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor);
@@ -570,7 +571,6 @@
// Used if SocketServer ownership lies with |this|.
std::unique_ptr<SocketServer> own_ss_;
- std::list<_SendMessage> sendlist_;
std::string name_;
// TODO(tommi): Add thread checks for proper use of control methods.
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index fb54bb5..91bea4f 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -24,6 +24,7 @@
#include "rtc_base/socket_address.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
+#include "test/testsupport/rtc_expect_death.h"
#if defined(WEBRTC_WIN)
#include <comdef.h> // NOLINT
@@ -307,29 +308,38 @@
}
// Verifies that two threads calling Invoke on each other at the same time does
-// not deadlock.
-TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
+// not deadlock but crash.
+#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
+TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
+ ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
AutoThread thread;
- Thread* current_thread = Thread::Current();
- ASSERT_TRUE(current_thread != nullptr);
-
+ Thread* main_thread = Thread::Current();
auto other_thread = Thread::CreateWithSocketServer();
other_thread->Start();
-
- struct LocalFuncs {
- static void Set(bool* out) { *out = true; }
- static void InvokeSet(Thread* thread, bool* out) {
- thread->Invoke<void>(RTC_FROM_HERE, Bind(&Set, out));
- }
- };
-
- bool called = false;
- other_thread->Invoke<void>(
- RTC_FROM_HERE, Bind(&LocalFuncs::InvokeSet, current_thread, &called));
-
- EXPECT_TRUE(called);
+ other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
+ RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
+ });
}
+TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
+ ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
+ AutoThread thread;
+ Thread* first = Thread::Current();
+
+ auto second = Thread::Create();
+ second->Start();
+ auto third = Thread::Create();
+ third->Start();
+
+ second->Invoke<void>(RTC_FROM_HERE, [&] {
+ third->Invoke<void>(RTC_FROM_HERE, [&] {
+ RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
+ });
+ });
+}
+
+#endif
+
// Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while
// invoking B.