blob: 7bda92411c4115136c953062a221552ea1b279a8 [file] [log] [blame]
henrike@webrtc.org0e118e72013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifdef POSIX
29#include <sys/time.h>
30#endif
31
32#include "talk/base/common.h"
33#include "talk/base/logging.h"
34#include "talk/base/messagequeue.h"
henrika@webrtc.org8485ec62014-01-14 10:00:58 +000035#if defined(__native_client__)
36#include "talk/base/nullsocketserver.h"
37typedef talk_base::NullSocketServer DefaultSocketServer;
38#else
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000039#include "talk/base/physicalsocketserver.h"
henrika@webrtc.org8485ec62014-01-14 10:00:58 +000040typedef talk_base::PhysicalSocketServer DefaultSocketServer;
41#endif
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000042
43namespace talk_base {
44
45const uint32 kMaxMsgLatency = 150; // 150 ms
46
47//------------------------------------------------------------------
48// MessageQueueManager
49
sergeyu@chromium.orga487db22013-08-23 23:21:25 +000050MessageQueueManager* MessageQueueManager::instance_ = NULL;
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000051
52MessageQueueManager* MessageQueueManager::Instance() {
53 // Note: This is not thread safe, but it is first called before threads are
54 // spawned.
55 if (!instance_)
56 instance_ = new MessageQueueManager;
57 return instance_;
58}
59
sergeyu@chromium.orga487db22013-08-23 23:21:25 +000060bool MessageQueueManager::IsInitialized() {
61 return instance_ != NULL;
62}
63
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000064MessageQueueManager::MessageQueueManager() {
65}
66
67MessageQueueManager::~MessageQueueManager() {
68}
69
70void MessageQueueManager::Add(MessageQueue *message_queue) {
sergeyu@chromium.orga487db22013-08-23 23:21:25 +000071 return Instance()->AddInternal(message_queue);
72}
73void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000074 // MessageQueueManager methods should be non-reentrant, so we
75 // ASSERT that is the case. If any of these ASSERT, please
76 // contact bpm or jbeda.
77 ASSERT(!crit_.CurrentThreadIsOwner());
78 CritScope cs(&crit_);
79 message_queues_.push_back(message_queue);
80}
81
82void MessageQueueManager::Remove(MessageQueue *message_queue) {
sergeyu@chromium.orga487db22013-08-23 23:21:25 +000083 // If there isn't a message queue manager instance, then there isn't a queue
84 // to remove.
85 if (!instance_) return;
86 return Instance()->RemoveInternal(message_queue);
87}
88void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.org0e118e72013-07-10 00:45:36 +000089 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
90 // If this is the last MessageQueue, destroy the manager as well so that
91 // we don't leak this object at program shutdown. As mentioned above, this is
92 // not thread-safe, but this should only happen at program termination (when
93 // the ThreadManager is destroyed, and threads are no longer active).
94 bool destroy = false;
95 {
96 CritScope cs(&crit_);
97 std::vector<MessageQueue *>::iterator iter;
98 iter = std::find(message_queues_.begin(), message_queues_.end(),
99 message_queue);
100 if (iter != message_queues_.end()) {
101 message_queues_.erase(iter);
102 }
103 destroy = message_queues_.empty();
104 }
105 if (destroy) {
106 instance_ = NULL;
107 delete this;
108 }
109}
110
111void MessageQueueManager::Clear(MessageHandler *handler) {
sergeyu@chromium.orga487db22013-08-23 23:21:25 +0000112 // If there isn't a message queue manager instance, then there aren't any
113 // queues to remove this handler from.
114 if (!instance_) return;
115 return Instance()->ClearInternal(handler);
116}
117void MessageQueueManager::ClearInternal(MessageHandler *handler) {
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000118 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
119 CritScope cs(&crit_);
120 std::vector<MessageQueue *>::iterator iter;
121 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
122 (*iter)->Clear(handler);
123}
124
125//------------------------------------------------------------------
126// MessageQueue
127
128MessageQueue::MessageQueue(SocketServer* ss)
fischman@webrtc.org934c9032014-05-19 17:58:04 +0000129 : ss_(ss), fStop_(false), fPeekKeep_(false),
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000130 dmsgq_next_num_(0) {
131 if (!ss_) {
132 // Currently, MessageQueue holds a socket server, and is the base class for
133 // Thread. It seems like it makes more sense for Thread to hold the socket
134 // server, and provide it to the MessageQueue, since the Thread controls
135 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
136 // messagequeue_unittest to depend on network libraries... yuck.
henrika@webrtc.org8485ec62014-01-14 10:00:58 +0000137 default_ss_.reset(new DefaultSocketServer());
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000138 ss_ = default_ss_.get();
139 }
140 ss_->SetMessageQueue(this);
fischman@webrtc.org934c9032014-05-19 17:58:04 +0000141 MessageQueueManager::Add(this);
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000142}
143
144MessageQueue::~MessageQueue() {
145 // The signal is done from here to ensure
146 // that it always gets called when the queue
147 // is going away.
148 SignalQueueDestroyed();
fischman@webrtc.org934c9032014-05-19 17:58:04 +0000149 MessageQueueManager::Remove(this);
150 Clear(NULL);
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000151 if (ss_) {
152 ss_->SetMessageQueue(NULL);
153 }
154}
155
156void MessageQueue::set_socketserver(SocketServer* ss) {
157 ss_ = ss ? ss : default_ss_.get();
158 ss_->SetMessageQueue(this);
159}
160
161void MessageQueue::Quit() {
162 fStop_ = true;
163 ss_->WakeUp();
164}
165
166bool MessageQueue::IsQuitting() {
167 return fStop_;
168}
169
170void MessageQueue::Restart() {
171 fStop_ = false;
172}
173
174bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
175 if (fPeekKeep_) {
176 *pmsg = msgPeek_;
177 return true;
178 }
179 if (!Get(pmsg, cmsWait))
180 return false;
181 msgPeek_ = *pmsg;
182 fPeekKeep_ = true;
183 return true;
184}
185
186bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
187 // Return and clear peek if present
188 // Always return the peek if it exists so there is Peek/Get symmetry
189
190 if (fPeekKeep_) {
191 *pmsg = msgPeek_;
192 fPeekKeep_ = false;
193 return true;
194 }
195
196 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
197
198 int cmsTotal = cmsWait;
199 int cmsElapsed = 0;
200 uint32 msStart = Time();
201 uint32 msCurrent = msStart;
202 while (true) {
203 // Check for sent messages
204 ReceiveSends();
205
206 // Check for posted events
207 int cmsDelayNext = kForever;
208 bool first_pass = true;
209 while (true) {
210 // All queue operations need to be locked, but nothing else in this loop
211 // (specifically handling disposed message) can happen inside the crit.
212 // Otherwise, disposed MessageHandlers will cause deadlocks.
213 {
214 CritScope cs(&crit_);
215 // On the first pass, check for delayed messages that have been
216 // triggered and calculate the next trigger time.
217 if (first_pass) {
218 first_pass = false;
219 while (!dmsgq_.empty()) {
220 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
221 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
222 break;
223 }
224 msgq_.push_back(dmsgq_.top().msg_);
225 dmsgq_.pop();
226 }
227 }
228 // Pull a message off the message queue, if available.
229 if (msgq_.empty()) {
230 break;
231 } else {
232 *pmsg = msgq_.front();
233 msgq_.pop_front();
234 }
235 } // crit_ is released here.
236
237 // Log a warning for time-sensitive messages that we're late to deliver.
238 if (pmsg->ts_sensitive) {
239 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
240 if (delay > 0) {
241 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
242 << (delay + kMaxMsgLatency) << "ms";
243 }
244 }
245 // If this was a dispose message, delete it and skip it.
246 if (MQID_DISPOSE == pmsg->message_id) {
247 ASSERT(NULL == pmsg->phandler);
248 delete pmsg->pdata;
249 *pmsg = Message();
250 continue;
251 }
252 return true;
253 }
254
255 if (fStop_)
256 break;
257
258 // Which is shorter, the delay wait or the asked wait?
259
260 int cmsNext;
261 if (cmsWait == kForever) {
262 cmsNext = cmsDelayNext;
263 } else {
264 cmsNext = _max(0, cmsTotal - cmsElapsed);
265 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
266 cmsNext = cmsDelayNext;
267 }
268
269 // Wait and multiplex in the meantime
270 if (!ss_->Wait(cmsNext, process_io))
271 return false;
272
273 // If the specified timeout expired, return
274
275 msCurrent = Time();
276 cmsElapsed = TimeDiff(msCurrent, msStart);
277 if (cmsWait != kForever) {
278 if (cmsElapsed >= cmsWait)
279 return false;
280 }
281 }
282 return false;
283}
284
285void MessageQueue::ReceiveSends() {
286}
287
288void MessageQueue::Post(MessageHandler *phandler, uint32 id,
289 MessageData *pdata, bool time_sensitive) {
290 if (fStop_)
291 return;
292
293 // Keep thread safe
294 // Add the message to the end of the queue
295 // Signal for the multiplexer to return
296
297 CritScope cs(&crit_);
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000298 Message msg;
299 msg.phandler = phandler;
300 msg.message_id = id;
301 msg.pdata = pdata;
302 if (time_sensitive) {
303 msg.ts_sensitive = Time() + kMaxMsgLatency;
304 }
305 msgq_.push_back(msg);
306 ss_->WakeUp();
307}
308
309void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
310 MessageHandler *phandler, uint32 id, MessageData* pdata) {
311 if (fStop_)
312 return;
313
314 // Keep thread safe
315 // Add to the priority queue. Gets sorted soonest first.
316 // Signal for the multiplexer to return.
317
318 CritScope cs(&crit_);
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000319 Message msg;
320 msg.phandler = phandler;
321 msg.message_id = id;
322 msg.pdata = pdata;
323 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
324 dmsgq_.push(dmsg);
325 // If this message queue processes 1 message every millisecond for 50 days,
326 // we will wrap this number. Even then, only messages with identical times
327 // will be misordered, and then only briefly. This is probably ok.
328 VERIFY(0 != ++dmsgq_next_num_);
329 ss_->WakeUp();
330}
331
332int MessageQueue::GetDelay() {
333 CritScope cs(&crit_);
334
335 if (!msgq_.empty())
336 return 0;
337
338 if (!dmsgq_.empty()) {
339 int delay = TimeUntil(dmsgq_.top().msTrigger_);
340 if (delay < 0)
341 delay = 0;
342 return delay;
343 }
344
345 return kForever;
346}
347
348void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
349 MessageList* removed) {
350 CritScope cs(&crit_);
351
352 // Remove messages with phandler
353
354 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
355 if (removed) {
356 removed->push_back(msgPeek_);
357 } else {
358 delete msgPeek_.pdata;
359 }
360 fPeekKeep_ = false;
361 }
362
363 // Remove from ordered message queue
364
365 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
366 if (it->Match(phandler, id)) {
367 if (removed) {
368 removed->push_back(*it);
369 } else {
370 delete it->pdata;
371 }
372 it = msgq_.erase(it);
373 } else {
374 ++it;
375 }
376 }
377
378 // Remove from priority queue. Not directly iterable, so use this approach
379
380 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
381 for (PriorityQueue::container_type::iterator it = new_end;
382 it != dmsgq_.container().end(); ++it) {
383 if (it->msg_.Match(phandler, id)) {
384 if (removed) {
385 removed->push_back(it->msg_);
386 } else {
387 delete it->msg_.pdata;
388 }
389 } else {
390 *new_end++ = *it;
391 }
392 }
393 dmsgq_.container().erase(new_end, dmsgq_.container().end());
394 dmsgq_.reheap();
395}
396
397void MessageQueue::Dispatch(Message *pmsg) {
398 pmsg->phandler->OnMessage(pmsg);
399}
400
henrike@webrtc.org0e118e72013-07-10 00:45:36 +0000401} // namespace talk_base