blob: abb01b3e9db3ff49ba9a06f3a80aa755cb16b4f2 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
Yves Gerey988cc082018-10-23 12:03:01 +020011#include <string>
12#include <utility>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000013
Steve Anton10542f22019-01-11 09:11:00 -080014#include "rtc_base/atomic_ops.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020015#include "rtc_base/checks.h"
16#include "rtc_base/logging.h"
Steve Anton10542f22019-01-11 09:11:00 -080017#include "rtc_base/message_queue.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020018#include "rtc_base/thread.h"
Steve Anton10542f22019-01-11 09:11:00 -080019#include "rtc_base/time_utils.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020020#include "rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021
22namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070023namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000024
Yves Gerey665174f2018-06-19 15:03:05 +020025const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070026const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000027
danilchap3c6abd22017-09-06 05:46:29 -070028class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 02:45:40 -070029 public:
jbauch5b361732017-07-06 23:51:37 -070030 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
danilchap3c6abd22017-09-06 05:46:29 -070031 RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-06 23:51:37 -070032 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 02:45:40 -070033 cs_->Enter();
jbauch5b361732017-07-06 23:51:37 -070034 *processing_ += 1;
andrespcdf61722016-07-08 02:45:40 -070035 }
36
danilchap3c6abd22017-09-06 05:46:29 -070037 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
jbauch5b361732017-07-06 23:51:37 -070038 *processing_ -= 1;
andrespcdf61722016-07-08 02:45:40 -070039 cs_->Leave();
40 }
41
42 private:
43 const CriticalSection* const cs_;
jbauch5b361732017-07-06 23:51:37 -070044 size_t* processing_;
andrespcdf61722016-07-08 02:45:40 -070045
jbauch5b361732017-07-06 23:51:37 -070046 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 02:45:40 -070047};
48} // namespace
49
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000050//------------------------------------------------------------------
51// MessageQueueManager
52
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000053MessageQueueManager* MessageQueueManager::Instance() {
Niels Möller5e007b72018-09-07 12:35:44 +020054 static MessageQueueManager* const instance = new MessageQueueManager;
55 return instance;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000056}
57
jbauch5b361732017-07-06 23:51:37 -070058MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000059
Yves Gerey665174f2018-06-19 15:03:05 +020060MessageQueueManager::~MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000061
Yves Gerey665174f2018-06-19 15:03:05 +020062void MessageQueueManager::Add(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063 return Instance()->AddInternal(message_queue);
64}
Yves Gerey665174f2018-06-19 15:03:05 +020065void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
jbauch5b361732017-07-06 23:51:37 -070066 CritScope cs(&crit_);
67 // Prevent changes while the list of message queues is processed.
68 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000069 message_queues_.push_back(message_queue);
70}
71
Yves Gerey665174f2018-06-19 15:03:05 +020072void MessageQueueManager::Remove(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000073 return Instance()->RemoveInternal(message_queue);
74}
Yves Gerey665174f2018-06-19 15:03:05 +020075void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076 {
jbauch5b361732017-07-06 23:51:37 -070077 CritScope cs(&crit_);
78 // Prevent changes while the list of message queues is processed.
79 RTC_DCHECK_EQ(processing_, 0);
Yves Gerey665174f2018-06-19 15:03:05 +020080 std::vector<MessageQueue*>::iterator iter;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000081 iter = std::find(message_queues_.begin(), message_queues_.end(),
82 message_queue);
83 if (iter != message_queues_.end()) {
84 message_queues_.erase(iter);
85 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000086 }
87}
88
Yves Gerey665174f2018-06-19 15:03:05 +020089void MessageQueueManager::Clear(MessageHandler* handler) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000090 return Instance()->ClearInternal(handler);
91}
Yves Gerey665174f2018-06-19 15:03:05 +020092void MessageQueueManager::ClearInternal(MessageHandler* handler) {
jbauch5b361732017-07-06 23:51:37 -070093 // Deleted objects may cause re-entrant calls to ClearInternal. This is
94 // allowed as the list of message queues does not change while queues are
95 // cleared.
96 MarkProcessingCritScope cs(&crit_, &processing_);
jbauch5b361732017-07-06 23:51:37 -070097 for (MessageQueue* queue : message_queues_) {
98 queue->Clear(handler);
99 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000100}
101
Niels Möller8909a632018-09-06 08:42:44 +0200102void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
deadbeeff5f03e82016-06-06 11:16:06 -0700103 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700104}
105
deadbeeff5f03e82016-06-06 11:16:06 -0700106void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700107 // This works by posting a delayed message at the current time and waiting
108 // for it to be dispatched on all queues, which will ensure that all messages
109 // that came before it were also dispatched.
110 volatile int queues_not_done = 0;
111
112 // This class is used so that whether the posted message is processed, or the
113 // message queue is simply cleared, queues_not_done gets decremented.
114 class ScopedIncrement : public MessageData {
115 public:
116 ScopedIncrement(volatile int* value) : value_(value) {
117 AtomicOps::Increment(value_);
118 }
119 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
120
121 private:
122 volatile int* value_;
123 };
124
deadbeeff5f03e82016-06-06 11:16:06 -0700125 {
jbauch5b361732017-07-06 23:51:37 -0700126 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 11:16:06 -0700127 for (MessageQueue* queue : message_queues_) {
Niels Möller8909a632018-09-06 08:42:44 +0200128 if (!queue->IsProcessingMessagesForTesting()) {
pthatcher1749bc32017-02-08 13:18:00 -0800129 // If the queue is not processing messages, it can
130 // be ignored. If we tried to post a message to it, it would be dropped
131 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700132 continue;
133 }
134 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
135 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700136 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700137 }
Niels Möller8909a632018-09-06 08:42:44 +0200138
139 rtc::Thread* current = rtc::Thread::Current();
140 // Note: One of the message queues may have been on this thread, which is
141 // why we can't synchronously wait for queues_not_done to go to 0; we need
142 // to process messages as well.
Ying Wangb2940902018-09-05 09:40:40 +0000143 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
Niels Möller8909a632018-09-06 08:42:44 +0200144 if (current) {
145 current->ProcessMessages(0);
146 }
deadbeeff5f03e82016-06-06 11:16:06 -0700147 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700148}
149
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000150//------------------------------------------------------------------
151// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800152MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200153 : fPeekKeep_(false),
154 dmsgq_next_num_(0),
155 fInitialized_(false),
156 fDestroyed_(false),
157 stop_(0),
158 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700159 RTC_DCHECK(ss);
160 // Currently, MessageQueue holds a socket server, and is the base class for
161 // Thread. It seems like it makes more sense for Thread to hold the socket
162 // server, and provide it to the MessageQueue, since the Thread controls
163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
164 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000165 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800166 if (init_queue) {
167 DoInit();
168 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000169}
170
danilchapbebf54c2016-04-28 01:32:48 -0700171MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
172 : MessageQueue(ss.get(), init_queue) {
173 own_ss_ = std::move(ss);
174}
175
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000176MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800177 DoDestroy();
178}
179
180void MessageQueue::DoInit() {
181 if (fInitialized_) {
182 return;
183 }
184
185 fInitialized_ = true;
186 MessageQueueManager::Add(this);
187}
188
189void MessageQueue::DoDestroy() {
190 if (fDestroyed_) {
191 return;
192 }
193
194 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000195 // The signal is done from here to ensure
196 // that it always gets called when the queue
197 // is going away.
198 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000199 MessageQueueManager::Remove(this);
Niels Möller5e007b72018-09-07 12:35:44 +0200200 ClearInternal(nullptr, MQID_ANY, nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800201
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000202 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800203 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000204 }
205}
206
jbauch9ccedc32016-02-25 01:14:56 -0800207SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800208 return ss_;
209}
210
jbauch9ccedc32016-02-25 01:14:56 -0800211void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800212 ss_->WakeUp();
213}
214
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000215void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200216 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800217 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000218}
219
220bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200221 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222}
223
Niels Möller8909a632018-09-06 08:42:44 +0200224bool MessageQueue::IsProcessingMessagesForTesting() {
pthatcher1749bc32017-02-08 13:18:00 -0800225 return !IsQuitting();
226}
227
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000228void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200229 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000230}
231
Yves Gerey665174f2018-06-19 15:03:05 +0200232bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000233 if (fPeekKeep_) {
234 *pmsg = msgPeek_;
235 return true;
236 }
237 if (!Get(pmsg, cmsWait))
238 return false;
239 msgPeek_ = *pmsg;
240 fPeekKeep_ = true;
241 return true;
242}
243
Yves Gerey665174f2018-06-19 15:03:05 +0200244bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000245 // Return and clear peek if present
246 // Always return the peek if it exists so there is Peek/Get symmetry
247
248 if (fPeekKeep_) {
249 *pmsg = msgPeek_;
250 fPeekKeep_ = false;
251 return true;
252 }
253
254 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
255
Honghai Zhang82d78622016-05-06 11:29:15 -0700256 int64_t cmsTotal = cmsWait;
257 int64_t cmsElapsed = 0;
258 int64_t msStart = TimeMillis();
259 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000260 while (true) {
261 // Check for sent messages
262 ReceiveSends();
263
264 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700265 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000266 bool first_pass = true;
267 while (true) {
268 // All queue operations need to be locked, but nothing else in this loop
269 // (specifically handling disposed message) can happen inside the crit.
270 // Otherwise, disposed MessageHandlers will cause deadlocks.
271 {
272 CritScope cs(&crit_);
273 // On the first pass, check for delayed messages that have been
274 // triggered and calculate the next trigger time.
275 if (first_pass) {
276 first_pass = false;
277 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700278 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000279 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
280 break;
281 }
282 msgq_.push_back(dmsgq_.top().msg_);
283 dmsgq_.pop();
284 }
285 }
286 // Pull a message off the message queue, if available.
287 if (msgq_.empty()) {
288 break;
289 } else {
290 *pmsg = msgq_.front();
291 msgq_.pop_front();
292 }
293 } // crit_ is released here.
294
295 // Log a warning for time-sensitive messages that we're late to deliver.
296 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700297 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000298 if (delay > 0) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100299 RTC_LOG_F(LS_WARNING)
300 << "id: " << pmsg->message_id
301 << " delay: " << (delay + kMaxMsgLatency) << "ms";
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000302 }
303 }
304 // If this was a dispose message, delete it and skip it.
305 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800306 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000307 delete pmsg->pdata;
308 *pmsg = Message();
309 continue;
310 }
311 return true;
312 }
313
André Susano Pinto02a57972016-07-22 13:30:05 +0200314 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000315 break;
316
317 // Which is shorter, the delay wait or the asked wait?
318
Honghai Zhang82d78622016-05-06 11:29:15 -0700319 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000320 if (cmsWait == kForever) {
321 cmsNext = cmsDelayNext;
322 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700323 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000324 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
325 cmsNext = cmsDelayNext;
326 }
327
jbauch9ccedc32016-02-25 01:14:56 -0800328 {
329 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700330 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800331 return false;
332 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333
334 // If the specified timeout expired, return
335
Honghai Zhang82d78622016-05-06 11:29:15 -0700336 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000337 cmsElapsed = TimeDiff(msCurrent, msStart);
338 if (cmsWait != kForever) {
339 if (cmsElapsed >= cmsWait)
340 return false;
341 }
342 }
343 return false;
344}
345
Yves Gerey665174f2018-06-19 15:03:05 +0200346void MessageQueue::ReceiveSends() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000347
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700348void MessageQueue::Post(const Location& posted_from,
349 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200350 uint32_t id,
351 MessageData* pdata,
352 bool time_sensitive) {
Niels Möller4ba6c262018-10-24 15:13:07 +0200353 if (IsQuitting()) {
354 delete pdata;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000355 return;
Niels Möller4ba6c262018-10-24 15:13:07 +0200356 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000357
358 // Keep thread safe
359 // Add the message to the end of the queue
360 // Signal for the multiplexer to return
361
jbauch9ccedc32016-02-25 01:14:56 -0800362 {
363 CritScope cs(&crit_);
364 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700365 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800366 msg.phandler = phandler;
367 msg.message_id = id;
368 msg.pdata = pdata;
369 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700370 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800371 }
372 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000373 }
jbauch9ccedc32016-02-25 01:14:56 -0800374 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000375}
376
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700377void MessageQueue::PostDelayed(const Location& posted_from,
378 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000379 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200380 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000381 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700382 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
383 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000384}
385
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700386void MessageQueue::PostAt(const Location& posted_from,
387 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000388 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200389 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000390 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700391 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700392 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700393 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700394}
395
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700396void MessageQueue::PostAt(const Location& posted_from,
397 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700398 MessageHandler* phandler,
399 uint32_t id,
400 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700401 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
402 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000403}
404
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700405void MessageQueue::DoDelayPost(const Location& posted_from,
406 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700407 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200408 MessageHandler* phandler,
409 uint32_t id,
410 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200411 if (IsQuitting()) {
Niels Möller4ba6c262018-10-24 15:13:07 +0200412 delete pdata;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000413 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700414 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000415
416 // Keep thread safe
417 // Add to the priority queue. Gets sorted soonest first.
418 // Signal for the multiplexer to return.
419
jbauch9ccedc32016-02-25 01:14:56 -0800420 {
421 CritScope cs(&crit_);
422 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700423 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800424 msg.phandler = phandler;
425 msg.message_id = id;
426 msg.pdata = pdata;
427 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
428 dmsgq_.push(dmsg);
429 // If this message queue processes 1 message every millisecond for 50 days,
430 // we will wrap this number. Even then, only messages with identical times
431 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800432 ++dmsgq_next_num_;
433 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800434 }
435 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000436}
437
438int MessageQueue::GetDelay() {
439 CritScope cs(&crit_);
440
441 if (!msgq_.empty())
442 return 0;
443
444 if (!dmsgq_.empty()) {
445 int delay = TimeUntil(dmsgq_.top().msTrigger_);
446 if (delay < 0)
447 delay = 0;
448 return delay;
449 }
450
451 return kForever;
452}
453
Peter Boström0c4e06b2015-10-07 12:23:21 +0200454void MessageQueue::Clear(MessageHandler* phandler,
455 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000456 MessageList* removed) {
457 CritScope cs(&crit_);
Niels Möller5e007b72018-09-07 12:35:44 +0200458 ClearInternal(phandler, id, removed);
459}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000460
Niels Möller5e007b72018-09-07 12:35:44 +0200461void MessageQueue::ClearInternal(MessageHandler* phandler,
462 uint32_t id,
463 MessageList* removed) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000464 // Remove messages with phandler
465
466 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
467 if (removed) {
468 removed->push_back(msgPeek_);
469 } else {
470 delete msgPeek_.pdata;
471 }
472 fPeekKeep_ = false;
473 }
474
475 // Remove from ordered message queue
476
477 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
478 if (it->Match(phandler, id)) {
479 if (removed) {
480 removed->push_back(*it);
481 } else {
482 delete it->pdata;
483 }
484 it = msgq_.erase(it);
485 } else {
486 ++it;
487 }
488 }
489
490 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000491
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000492 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
493 for (PriorityQueue::container_type::iterator it = new_end;
494 it != dmsgq_.container().end(); ++it) {
495 if (it->msg_.Match(phandler, id)) {
496 if (removed) {
497 removed->push_back(it->msg_);
498 } else {
499 delete it->msg_.pdata;
500 }
501 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000502 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000503 }
504 }
505 dmsgq_.container().erase(new_end, dmsgq_.container().end());
506 dmsgq_.reheap();
507}
508
Yves Gerey665174f2018-06-19 15:03:05 +0200509void MessageQueue::Dispatch(Message* pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700510 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
511 pmsg->posted_from.file_and_line(), "src_func",
512 pmsg->posted_from.function_name());
513 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000514 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700515 int64_t end_time = TimeMillis();
516 int64_t diff = TimeDiff(end_time, start_time);
517 if (diff >= kSlowDispatchLoggingThreshold) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100518 RTC_LOG(LS_INFO) << "Message took " << diff
519 << "ms to dispatch. Posted from: "
520 << pmsg->posted_from.ToString();
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700521 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000522}
523
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000524} // namespace rtc