blob: bb58ebccacc4c633761b86eb3135b543944db5fa [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Steve Anton10542f22019-01-11 09:11:00 -080011#ifndef RTC_BASE_MESSAGE_QUEUE_H_
12#define RTC_BASE_MESSAGE_QUEUE_H_
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000013
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020014#include <string.h>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000015
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020016#include <algorithm>
17#include <list>
18#include <memory>
19#include <queue>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020020#include <vector>
21
Mirko Bonadeid9708072019-01-25 20:26:48 +010022#include "api/scoped_refptr.h"
Steve Anton10542f22019-01-11 09:11:00 -080023#include "rtc_base/constructor_magic.h"
24#include "rtc_base/critical_section.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020025#include "rtc_base/location.h"
Steve Anton10542f22019-01-11 09:11:00 -080026#include "rtc_base/message_handler.h"
Steve Anton10542f22019-01-11 09:11:00 -080027#include "rtc_base/socket_server.h"
Mirko Bonadei35214fc2019-09-23 14:54:28 +020028#include "rtc_base/system/rtc_export.h"
Artem Titove41c4332018-07-25 15:04:28 +020029#include "rtc_base/third_party/sigslot/sigslot.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020030#include "rtc_base/thread_annotations.h"
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020031
32namespace rtc {
33
34struct Message;
35class MessageQueue;
36
37// MessageQueueManager does cleanup of of message queues
38
Mirko Bonadei35214fc2019-09-23 14:54:28 +020039class RTC_EXPORT MessageQueueManager {
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020040 public:
Yves Gerey665174f2018-06-19 15:03:05 +020041 static void Add(MessageQueue* message_queue);
42 static void Remove(MessageQueue* message_queue);
43 static void Clear(MessageHandler* handler);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020044
Niels Möller8909a632018-09-06 08:42:44 +020045 // TODO(nisse): Delete alias, as soon as downstream code is updated.
46 static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
47
48 // For testing purposes, for use with a simulated clock.
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020049 // Ensures that all message queues have processed delayed messages
50 // up until the current point in time.
Niels Möller8909a632018-09-06 08:42:44 +020051 static void ProcessAllMessageQueuesForTesting();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020052
53 private:
54 static MessageQueueManager* Instance();
55
56 MessageQueueManager();
57 ~MessageQueueManager();
58
Yves Gerey665174f2018-06-19 15:03:05 +020059 void AddInternal(MessageQueue* message_queue);
60 void RemoveInternal(MessageQueue* message_queue);
61 void ClearInternal(MessageHandler* handler);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020062 void ProcessAllMessageQueuesInternal();
63
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020064 // This list contains all live MessageQueues.
danilchap3c6abd22017-09-06 05:46:29 -070065 std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020066
jbauch5b361732017-07-06 23:51:37 -070067 // Methods that don't modify the list of message queues may be called in a
68 // re-entrant fashion. "processing_" keeps track of the depth of re-entrant
69 // calls.
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020070 CriticalSection crit_;
danilchap3c6abd22017-09-06 05:46:29 -070071 size_t processing_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020072};
73
74// Derive from this for specialized data
75// App manages lifetime, except when messages are purged
76
77class MessageData {
78 public:
79 MessageData() {}
80 virtual ~MessageData() {}
81};
82
83template <class T>
84class TypedMessageData : public MessageData {
85 public:
Yves Gerey665174f2018-06-19 15:03:05 +020086 explicit TypedMessageData(const T& data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020087 const T& data() const { return data_; }
88 T& data() { return data_; }
Yves Gerey665174f2018-06-19 15:03:05 +020089
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020090 private:
91 T data_;
92};
93
94// Like TypedMessageData, but for pointers that require a delete.
95template <class T>
96class ScopedMessageData : public MessageData {
97 public:
98 explicit ScopedMessageData(std::unique_ptr<T> data)
99 : data_(std::move(data)) {}
100 // Deprecated.
101 // TODO(deadbeef): Remove this once downstream applications stop using it.
102 explicit ScopedMessageData(T* data) : data_(data) {}
103 // Deprecated.
104 // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
105 // this once downstream applications stop using it, then rename inner_data to
106 // just data.
107 const std::unique_ptr<T>& data() const { return data_; }
108 std::unique_ptr<T>& data() { return data_; }
109
110 const T& inner_data() const { return *data_; }
111 T& inner_data() { return *data_; }
112
113 private:
114 std::unique_ptr<T> data_;
115};
116
117// Like ScopedMessageData, but for reference counted pointers.
118template <class T>
119class ScopedRefMessageData : public MessageData {
120 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200121 explicit ScopedRefMessageData(T* data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200122 const scoped_refptr<T>& data() const { return data_; }
123 scoped_refptr<T>& data() { return data_; }
Yves Gerey665174f2018-06-19 15:03:05 +0200124
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200125 private:
126 scoped_refptr<T> data_;
127};
128
Yves Gerey665174f2018-06-19 15:03:05 +0200129template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200130inline MessageData* WrapMessageData(const T& data) {
131 return new TypedMessageData<T>(data);
132}
133
Yves Gerey665174f2018-06-19 15:03:05 +0200134template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200135inline const T& UseMessageData(MessageData* data) {
Yves Gerey665174f2018-06-19 15:03:05 +0200136 return static_cast<TypedMessageData<T>*>(data)->data();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200137}
138
Yves Gerey665174f2018-06-19 15:03:05 +0200139template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200140class DisposeData : public MessageData {
141 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200142 explicit DisposeData(T* data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200143 virtual ~DisposeData() { delete data_; }
Yves Gerey665174f2018-06-19 15:03:05 +0200144
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200145 private:
146 T* data_;
147};
148
149const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
150const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
151
152// No destructor
153
154struct Message {
155 Message()
156 : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
157 inline bool Match(MessageHandler* handler, uint32_t id) const {
158 return (handler == nullptr || handler == phandler) &&
159 (id == MQID_ANY || id == message_id);
160 }
161 Location posted_from;
Yves Gerey665174f2018-06-19 15:03:05 +0200162 MessageHandler* phandler;
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200163 uint32_t message_id;
Yves Gerey665174f2018-06-19 15:03:05 +0200164 MessageData* pdata;
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200165 int64_t ts_sensitive;
166};
167
168typedef std::list<Message> MessageList;
169
170// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
171// with the same trigger time are processed in num_ (FIFO) order.
172
173class DelayedMessage {
174 public:
175 DelayedMessage(int64_t delay,
176 int64_t trigger,
177 uint32_t num,
178 const Message& msg)
179 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
180
Yves Gerey665174f2018-06-19 15:03:05 +0200181 bool operator<(const DelayedMessage& dmsg) const {
182 return (dmsg.msTrigger_ < msTrigger_) ||
183 ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200184 }
185
186 int64_t cmsDelay_; // for debugging
187 int64_t msTrigger_;
188 uint32_t num_;
189 Message msg_;
190};
191
Mirko Bonadei35214fc2019-09-23 14:54:28 +0200192class RTC_EXPORT MessageQueue {
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200193 public:
194 static const int kForever = -1;
195
196 // Create a new MessageQueue and optionally assign it to the passed
197 // SocketServer. Subclasses that override Clear should pass false for
198 // init_queue and call DoInit() from their constructor to prevent races
199 // with the MessageQueueManager using the object while the vtable is still
200 // being created.
201 MessageQueue(SocketServer* ss, bool init_queue);
202 MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
203
204 // NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL
205 // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
206 // between the destructor modifying the vtable, and the MessageQueueManager
207 // calling Clear on the object from a different thread.
208 virtual ~MessageQueue();
209
210 SocketServer* socketserver();
211
212 // Note: The behavior of MessageQueue has changed. When a MQ is stopped,
213 // futher Posts and Sends will fail. However, any pending Sends and *ready*
214 // Posts (as opposed to unexpired delayed Posts) will be delivered before
215 // Get (or Peek) returns false. By guaranteeing delivery of those messages,
216 // we eliminate the race condition when an MessageHandler and MessageQueue
217 // may be destroyed independently of each other.
218 virtual void Quit();
219 virtual bool IsQuitting();
220 virtual void Restart();
221 // Not all message queues actually process messages (such as SignalThread).
222 // In those cases, it's important to know, before posting, that it won't be
223 // Processed. Normally, this would be true until IsQuitting() is true.
Niels Möller8909a632018-09-06 08:42:44 +0200224 virtual bool IsProcessingMessagesForTesting();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200225
226 // Get() will process I/O until:
227 // 1) A message is available (returns true)
228 // 2) cmsWait seconds have elapsed (returns false)
229 // 3) Stop() is called (returns false)
Yves Gerey665174f2018-06-19 15:03:05 +0200230 virtual bool Get(Message* pmsg,
231 int cmsWait = kForever,
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200232 bool process_io = true);
Yves Gerey665174f2018-06-19 15:03:05 +0200233 virtual bool Peek(Message* pmsg, int cmsWait = 0);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200234 virtual void Post(const Location& posted_from,
235 MessageHandler* phandler,
236 uint32_t id = 0,
237 MessageData* pdata = nullptr,
238 bool time_sensitive = false);
239 virtual void PostDelayed(const Location& posted_from,
240 int cmsDelay,
241 MessageHandler* phandler,
242 uint32_t id = 0,
243 MessageData* pdata = nullptr);
244 virtual void PostAt(const Location& posted_from,
245 int64_t tstamp,
246 MessageHandler* phandler,
247 uint32_t id = 0,
248 MessageData* pdata = nullptr);
249 // TODO(honghaiz): Remove this when all the dependencies are removed.
250 virtual void PostAt(const Location& posted_from,
251 uint32_t tstamp,
252 MessageHandler* phandler,
253 uint32_t id = 0,
254 MessageData* pdata = nullptr);
255 virtual void Clear(MessageHandler* phandler,
256 uint32_t id = MQID_ANY,
257 MessageList* removed = nullptr);
Yves Gerey665174f2018-06-19 15:03:05 +0200258 virtual void Dispatch(Message* pmsg);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200259 virtual void ReceiveSends();
260
261 // Amount of time until the next message can be retrieved
262 virtual int GetDelay();
263
264 bool empty() const { return size() == 0u; }
265 size_t size() const {
266 CritScope cs(&crit_); // msgq_.size() is not thread safe.
267 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
268 }
269
270 // Internally posts a message which causes the doomed object to be deleted
Yves Gerey665174f2018-06-19 15:03:05 +0200271 template <class T>
272 void Dispose(T* doomed) {
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200273 if (doomed) {
274 Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
275 }
276 }
277
278 // When this signal is sent out, any references to this queue should
279 // no longer be used.
280 sigslot::signal0<> SignalQueueDestroyed;
281
282 protected:
283 class PriorityQueue : public std::priority_queue<DelayedMessage> {
284 public:
285 container_type& container() { return c; }
286 void reheap() { make_heap(c.begin(), c.end(), comp); }
287 };
288
289 void DoDelayPost(const Location& posted_from,
290 int64_t cmsDelay,
291 int64_t tstamp,
292 MessageHandler* phandler,
293 uint32_t id,
294 MessageData* pdata);
295
296 // Perform initialization, subclasses must call this from their constructor
297 // if false was passed as init_queue to the MessageQueue constructor.
298 void DoInit();
299
Niels Möller5e007b72018-09-07 12:35:44 +0200300 // Does not take any lock. Must be called either while holding crit_, or by
301 // the destructor (by definition, the latter has exclusive access).
302 void ClearInternal(MessageHandler* phandler,
303 uint32_t id,
304 MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
305
306 // Perform cleanup; subclasses must call this from the destructor,
307 // and are not expected to actually hold the lock.
308 void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200309
310 void WakeUpSocketServer();
311
312 bool fPeekKeep_;
313 Message msgPeek_;
danilchap3c6abd22017-09-06 05:46:29 -0700314 MessageList msgq_ RTC_GUARDED_BY(crit_);
315 PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
316 uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200317 CriticalSection crit_;
318 bool fInitialized_;
319 bool fDestroyed_;
320
321 private:
322 volatile int stop_;
323
324 // The SocketServer might not be owned by MessageQueue.
325 SocketServer* const ss_;
326 // Used if SocketServer ownership lies with |this|.
327 std::unique_ptr<SocketServer> own_ss_;
328
329 RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
330};
331
332} // namespace rtc
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333
Steve Anton10542f22019-01-11 09:11:00 -0800334#endif // RTC_BASE_MESSAGE_QUEUE_H_