blob: 5a6bd0a842d1becc9e16d82a92ab75fcad706490 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Steve Anton10542f22019-01-11 09:11:00 -080011#ifndef RTC_BASE_MESSAGE_QUEUE_H_
12#define RTC_BASE_MESSAGE_QUEUE_H_
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000013
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020014#include <string.h>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000015
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020016#include <algorithm>
17#include <list>
18#include <memory>
19#include <queue>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020020#include <vector>
21
Mirko Bonadeid9708072019-01-25 20:26:48 +010022#include "api/scoped_refptr.h"
Steve Anton10542f22019-01-11 09:11:00 -080023#include "rtc_base/constructor_magic.h"
24#include "rtc_base/critical_section.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020025#include "rtc_base/location.h"
Steve Anton10542f22019-01-11 09:11:00 -080026#include "rtc_base/message_handler.h"
Steve Anton10542f22019-01-11 09:11:00 -080027#include "rtc_base/socket_server.h"
Artem Titove41c4332018-07-25 15:04:28 +020028#include "rtc_base/third_party/sigslot/sigslot.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020029#include "rtc_base/thread_annotations.h"
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020030
31namespace rtc {
32
33struct Message;
34class MessageQueue;
35
36// MessageQueueManager does cleanup of of message queues
37
38class MessageQueueManager {
39 public:
Yves Gerey665174f2018-06-19 15:03:05 +020040 static void Add(MessageQueue* message_queue);
41 static void Remove(MessageQueue* message_queue);
42 static void Clear(MessageHandler* handler);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020043
Niels Möller8909a632018-09-06 08:42:44 +020044 // TODO(nisse): Delete alias, as soon as downstream code is updated.
45 static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); }
46
47 // For testing purposes, for use with a simulated clock.
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020048 // Ensures that all message queues have processed delayed messages
49 // up until the current point in time.
Niels Möller8909a632018-09-06 08:42:44 +020050 static void ProcessAllMessageQueuesForTesting();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020051
52 private:
53 static MessageQueueManager* Instance();
54
55 MessageQueueManager();
56 ~MessageQueueManager();
57
Yves Gerey665174f2018-06-19 15:03:05 +020058 void AddInternal(MessageQueue* message_queue);
59 void RemoveInternal(MessageQueue* message_queue);
60 void ClearInternal(MessageHandler* handler);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020061 void ProcessAllMessageQueuesInternal();
62
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020063 // This list contains all live MessageQueues.
danilchap3c6abd22017-09-06 05:46:29 -070064 std::vector<MessageQueue*> message_queues_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020065
jbauch5b361732017-07-06 23:51:37 -070066 // Methods that don't modify the list of message queues may be called in a
67 // re-entrant fashion. "processing_" keeps track of the depth of re-entrant
68 // calls.
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020069 CriticalSection crit_;
danilchap3c6abd22017-09-06 05:46:29 -070070 size_t processing_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020071};
72
73// Derive from this for specialized data
74// App manages lifetime, except when messages are purged
75
76class MessageData {
77 public:
78 MessageData() {}
79 virtual ~MessageData() {}
80};
81
82template <class T>
83class TypedMessageData : public MessageData {
84 public:
Yves Gerey665174f2018-06-19 15:03:05 +020085 explicit TypedMessageData(const T& data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020086 const T& data() const { return data_; }
87 T& data() { return data_; }
Yves Gerey665174f2018-06-19 15:03:05 +020088
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +020089 private:
90 T data_;
91};
92
93// Like TypedMessageData, but for pointers that require a delete.
94template <class T>
95class ScopedMessageData : public MessageData {
96 public:
97 explicit ScopedMessageData(std::unique_ptr<T> data)
98 : data_(std::move(data)) {}
99 // Deprecated.
100 // TODO(deadbeef): Remove this once downstream applications stop using it.
101 explicit ScopedMessageData(T* data) : data_(data) {}
102 // Deprecated.
103 // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of
104 // this once downstream applications stop using it, then rename inner_data to
105 // just data.
106 const std::unique_ptr<T>& data() const { return data_; }
107 std::unique_ptr<T>& data() { return data_; }
108
109 const T& inner_data() const { return *data_; }
110 T& inner_data() { return *data_; }
111
112 private:
113 std::unique_ptr<T> data_;
114};
115
116// Like ScopedMessageData, but for reference counted pointers.
117template <class T>
118class ScopedRefMessageData : public MessageData {
119 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200120 explicit ScopedRefMessageData(T* data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200121 const scoped_refptr<T>& data() const { return data_; }
122 scoped_refptr<T>& data() { return data_; }
Yves Gerey665174f2018-06-19 15:03:05 +0200123
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200124 private:
125 scoped_refptr<T> data_;
126};
127
Yves Gerey665174f2018-06-19 15:03:05 +0200128template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200129inline MessageData* WrapMessageData(const T& data) {
130 return new TypedMessageData<T>(data);
131}
132
Yves Gerey665174f2018-06-19 15:03:05 +0200133template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200134inline const T& UseMessageData(MessageData* data) {
Yves Gerey665174f2018-06-19 15:03:05 +0200135 return static_cast<TypedMessageData<T>*>(data)->data();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200136}
137
Yves Gerey665174f2018-06-19 15:03:05 +0200138template <class T>
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200139class DisposeData : public MessageData {
140 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200141 explicit DisposeData(T* data) : data_(data) {}
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200142 virtual ~DisposeData() { delete data_; }
Yves Gerey665174f2018-06-19 15:03:05 +0200143
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200144 private:
145 T* data_;
146};
147
148const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
149const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
150
151// No destructor
152
153struct Message {
154 Message()
155 : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {}
156 inline bool Match(MessageHandler* handler, uint32_t id) const {
157 return (handler == nullptr || handler == phandler) &&
158 (id == MQID_ANY || id == message_id);
159 }
160 Location posted_from;
Yves Gerey665174f2018-06-19 15:03:05 +0200161 MessageHandler* phandler;
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200162 uint32_t message_id;
Yves Gerey665174f2018-06-19 15:03:05 +0200163 MessageData* pdata;
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200164 int64_t ts_sensitive;
165};
166
167typedef std::list<Message> MessageList;
168
169// DelayedMessage goes into a priority queue, sorted by trigger time. Messages
170// with the same trigger time are processed in num_ (FIFO) order.
171
172class DelayedMessage {
173 public:
174 DelayedMessage(int64_t delay,
175 int64_t trigger,
176 uint32_t num,
177 const Message& msg)
178 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {}
179
Yves Gerey665174f2018-06-19 15:03:05 +0200180 bool operator<(const DelayedMessage& dmsg) const {
181 return (dmsg.msTrigger_ < msTrigger_) ||
182 ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_));
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200183 }
184
185 int64_t cmsDelay_; // for debugging
186 int64_t msTrigger_;
187 uint32_t num_;
188 Message msg_;
189};
190
191class MessageQueue {
192 public:
193 static const int kForever = -1;
194
195 // Create a new MessageQueue and optionally assign it to the passed
196 // SocketServer. Subclasses that override Clear should pass false for
197 // init_queue and call DoInit() from their constructor to prevent races
198 // with the MessageQueueManager using the object while the vtable is still
199 // being created.
200 MessageQueue(SocketServer* ss, bool init_queue);
201 MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
202
203 // NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL
204 // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race
205 // between the destructor modifying the vtable, and the MessageQueueManager
206 // calling Clear on the object from a different thread.
207 virtual ~MessageQueue();
208
209 SocketServer* socketserver();
210
211 // Note: The behavior of MessageQueue has changed. When a MQ is stopped,
212 // futher Posts and Sends will fail. However, any pending Sends and *ready*
213 // Posts (as opposed to unexpired delayed Posts) will be delivered before
214 // Get (or Peek) returns false. By guaranteeing delivery of those messages,
215 // we eliminate the race condition when an MessageHandler and MessageQueue
216 // may be destroyed independently of each other.
217 virtual void Quit();
218 virtual bool IsQuitting();
219 virtual void Restart();
220 // Not all message queues actually process messages (such as SignalThread).
221 // In those cases, it's important to know, before posting, that it won't be
222 // Processed. Normally, this would be true until IsQuitting() is true.
Niels Möller8909a632018-09-06 08:42:44 +0200223 virtual bool IsProcessingMessagesForTesting();
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200224
225 // Get() will process I/O until:
226 // 1) A message is available (returns true)
227 // 2) cmsWait seconds have elapsed (returns false)
228 // 3) Stop() is called (returns false)
Yves Gerey665174f2018-06-19 15:03:05 +0200229 virtual bool Get(Message* pmsg,
230 int cmsWait = kForever,
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200231 bool process_io = true);
Yves Gerey665174f2018-06-19 15:03:05 +0200232 virtual bool Peek(Message* pmsg, int cmsWait = 0);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200233 virtual void Post(const Location& posted_from,
234 MessageHandler* phandler,
235 uint32_t id = 0,
236 MessageData* pdata = nullptr,
237 bool time_sensitive = false);
238 virtual void PostDelayed(const Location& posted_from,
239 int cmsDelay,
240 MessageHandler* phandler,
241 uint32_t id = 0,
242 MessageData* pdata = nullptr);
243 virtual void PostAt(const Location& posted_from,
244 int64_t tstamp,
245 MessageHandler* phandler,
246 uint32_t id = 0,
247 MessageData* pdata = nullptr);
248 // TODO(honghaiz): Remove this when all the dependencies are removed.
249 virtual void PostAt(const Location& posted_from,
250 uint32_t tstamp,
251 MessageHandler* phandler,
252 uint32_t id = 0,
253 MessageData* pdata = nullptr);
254 virtual void Clear(MessageHandler* phandler,
255 uint32_t id = MQID_ANY,
256 MessageList* removed = nullptr);
Yves Gerey665174f2018-06-19 15:03:05 +0200257 virtual void Dispatch(Message* pmsg);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200258 virtual void ReceiveSends();
259
260 // Amount of time until the next message can be retrieved
261 virtual int GetDelay();
262
263 bool empty() const { return size() == 0u; }
264 size_t size() const {
265 CritScope cs(&crit_); // msgq_.size() is not thread safe.
266 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
267 }
268
269 // Internally posts a message which causes the doomed object to be deleted
Yves Gerey665174f2018-06-19 15:03:05 +0200270 template <class T>
271 void Dispose(T* doomed) {
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200272 if (doomed) {
273 Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
274 }
275 }
276
277 // When this signal is sent out, any references to this queue should
278 // no longer be used.
279 sigslot::signal0<> SignalQueueDestroyed;
280
281 protected:
282 class PriorityQueue : public std::priority_queue<DelayedMessage> {
283 public:
284 container_type& container() { return c; }
285 void reheap() { make_heap(c.begin(), c.end(), comp); }
286 };
287
288 void DoDelayPost(const Location& posted_from,
289 int64_t cmsDelay,
290 int64_t tstamp,
291 MessageHandler* phandler,
292 uint32_t id,
293 MessageData* pdata);
294
295 // Perform initialization, subclasses must call this from their constructor
296 // if false was passed as init_queue to the MessageQueue constructor.
297 void DoInit();
298
Niels Möller5e007b72018-09-07 12:35:44 +0200299 // Does not take any lock. Must be called either while holding crit_, or by
300 // the destructor (by definition, the latter has exclusive access).
301 void ClearInternal(MessageHandler* phandler,
302 uint32_t id,
303 MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
304
305 // Perform cleanup; subclasses must call this from the destructor,
306 // and are not expected to actually hold the lock.
307 void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200308
309 void WakeUpSocketServer();
310
311 bool fPeekKeep_;
312 Message msgPeek_;
danilchap3c6abd22017-09-06 05:46:29 -0700313 MessageList msgq_ RTC_GUARDED_BY(crit_);
314 PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
315 uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
Henrik Kjellanderec78f1c2017-06-29 07:52:50 +0200316 CriticalSection crit_;
317 bool fInitialized_;
318 bool fDestroyed_;
319
320 private:
321 volatile int stop_;
322
323 // The SocketServer might not be owned by MessageQueue.
324 SocketServer* const ss_;
325 // Used if SocketServer ownership lies with |this|.
326 std::unique_ptr<SocketServer> own_ss_;
327
328 RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
329};
330
331} // namespace rtc
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000332
Steve Anton10542f22019-01-11 09:11:00 -0800333#endif // RTC_BASE_MESSAGE_QUEUE_H_