blob: 3d9188ef3383933e9727874465aab795accdb40d [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020011#include "rtc_base/task_queue.h"
tommic06b1332016-05-14 11:31:40 -070012
13#include <fcntl.h>
tommi8c80c6e2017-02-23 00:34:52 -080014#include <signal.h>
tommic06b1332016-05-14 11:31:40 -070015#include <string.h>
16#include <unistd.h>
17
18#include "base/third_party/libevent/event.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020019#include "rtc_base/checks.h"
20#include "rtc_base/logging.h"
21#include "rtc_base/platform_thread.h"
22#include "rtc_base/refcount.h"
23#include "rtc_base/refcountedobject.h"
24#include "rtc_base/safe_conversions.h"
25#include "rtc_base/task_queue.h"
26#include "rtc_base/task_queue_posix.h"
27#include "rtc_base/timeutils.h"
tommic06b1332016-05-14 11:31:40 -070028
29namespace rtc {
30using internal::GetQueuePtrTls;
31using internal::AutoSetCurrentQueuePtr;
32
33namespace {
34static const char kQuit = 1;
35static const char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080036static const char kRunReplyTask = 3;
37
tommic9bb7912017-02-24 10:42:14 -080038using Priority = TaskQueue::Priority;
39
tommi8c80c6e2017-02-23 00:34:52 -080040// This ignores the SIGPIPE signal on the calling thread.
41// This signal can be fired when trying to write() to a pipe that's being
42// closed or while closing a pipe that's being written to.
43// We can run into that situation (e.g. reply tasks that don't get a chance to
44// run because the task queue is being deleted) so we ignore this signal and
45// continue as normal.
46// As a side note for this implementation, it would be great if we could safely
47// restore the sigmask, but unfortunately the operation of restoring it, can
48// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
49// The SIGPIPE signal by default causes the process to be terminated, so we
50// don't want to risk that.
51// An alternative to this approach is to ignore the signal for the whole
52// process:
53// signal(SIGPIPE, SIG_IGN);
54void IgnoreSigPipeSignalOnCurrentThread() {
55 sigset_t sigpipe_mask;
56 sigemptyset(&sigpipe_mask);
57 sigaddset(&sigpipe_mask, SIGPIPE);
58 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
59}
tommic06b1332016-05-14 11:31:40 -070060
61struct TimerEvent {
62 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
63 : task(std::move(task)) {}
64 ~TimerEvent() { event_del(&ev); }
65 event ev;
66 std::unique_ptr<QueuedTask> task;
67};
68
69bool SetNonBlocking(int fd) {
70 const int flags = fcntl(fd, F_GETFL);
71 RTC_CHECK(flags != -1);
72 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
73}
tommi1666b612016-07-13 10:58:12 -070074
75// TODO(tommi): This is a hack to support two versions of libevent that we're
76// compatible with. The method we really want to call is event_assign(),
77// since event_set() has been marked as deprecated (and doesn't accept
78// passing event_base__ as a parameter). However, the version of libevent
79// that we have in Chromium, doesn't have event_assign(), so we need to call
80// event_set() there.
81void EventAssign(struct event* ev,
82 struct event_base* base,
83 int fd,
84 short events,
85 void (*callback)(int, short, void*),
86 void* arg) {
87#if defined(_EVENT2_EVENT_H_)
88 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
89#else
90 event_set(ev, fd, events, callback, arg);
91 RTC_CHECK_EQ(0, event_base_set(base, ev));
92#endif
93}
tommic9bb7912017-02-24 10:42:14 -080094
95ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
96 switch (priority) {
97 case Priority::HIGH:
98 return kRealtimePriority;
99 case Priority::LOW:
100 return kLowPriority;
101 case Priority::NORMAL:
102 return kNormalPriority;
103 default:
104 RTC_NOTREACHED();
105 break;
106 }
107 return kNormalPriority;
108}
tommic06b1332016-05-14 11:31:40 -0700109} // namespace
110
perkj650fdae2017-08-25 05:00:11 -0700111class TaskQueue::Impl : public RefCountInterface {
112 public:
113 explicit Impl(const char* queue_name,
114 TaskQueue* queue,
115 Priority priority = Priority::NORMAL);
116 ~Impl() override;
117
118 static TaskQueue::Impl* Current();
119 static TaskQueue* CurrentQueue();
120
121 // Used for DCHECKing the current queue.
perkj650fdae2017-08-25 05:00:11 -0700122 bool IsCurrent() const;
123
124 void PostTask(std::unique_ptr<QueuedTask> task);
125 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
126 std::unique_ptr<QueuedTask> reply,
127 TaskQueue::Impl* reply_queue);
128
129 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
130
131 private:
132 static void ThreadMain(void* context);
133 static void OnWakeup(int socket, short flags, void* context); // NOLINT
134 static void RunTask(int fd, short flags, void* context); // NOLINT
135 static void RunTimer(int fd, short flags, void* context); // NOLINT
136
137 class ReplyTaskOwner;
138 class PostAndReplyTask;
139 class SetTimerTask;
140
141 typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
142
143 void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
144
145 struct QueueContext;
146 TaskQueue* const queue_;
147 int wakeup_pipe_in_ = -1;
148 int wakeup_pipe_out_ = -1;
149 event_base* event_base_;
150 std::unique_ptr<event> wakeup_event_;
151 PlatformThread thread_;
152 rtc::CriticalSection pending_lock_;
danilchap3c6abd22017-09-06 05:46:29 -0700153 std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
perkj650fdae2017-08-25 05:00:11 -0700154 std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
danilchap3c6abd22017-09-06 05:46:29 -0700155 RTC_GUARDED_BY(pending_lock_);
perkj650fdae2017-08-25 05:00:11 -0700156};
157
158struct TaskQueue::Impl::QueueContext {
159 explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
160 TaskQueue::Impl* queue;
tommic06b1332016-05-14 11:31:40 -0700161 bool is_active;
162 // Holds a list of events pending timers for cleanup when the loop exits.
163 std::list<TimerEvent*> pending_timers_;
164};
165
tommi8c80c6e2017-02-23 00:34:52 -0800166// Posting a reply task is tricky business. This class owns the reply task
167// and a reference to it is held by both the reply queue and the first task.
168// Here's an outline of what happens when dealing with a reply task.
169// * The ReplyTaskOwner owns the |reply_| task.
170// * One ref owned by PostAndReplyTask
171// * One ref owned by the reply TaskQueue
172// * ReplyTaskOwner has a flag |run_task_| initially set to false.
173// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
174// * After successfully running the original |task_|, PostAndReplyTask() calls
175// set_should_run_task(). This sets |run_task_| to true.
176// * In PostAndReplyTask's dtor:
177// * It releases its reference to ReplyTaskOwner (important to do this first).
178// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
179// * PostAndReplyTask doesn't care if write() fails, but when it does:
180// * The reply queue is gone.
181// * ReplyTaskOwner has already been deleted and the reply task too.
182// * If write() succeeds:
183// * ReplyQueue receives the kRunReplyTask message
184// * Goes through all pending tasks, finding the first that HasOneRef()
185// * Calls ReplyTaskOwner::Run()
186// * if set_should_run_task() was called, the reply task will be run
187// * Release the reference to ReplyTaskOwner
188// * ReplyTaskOwner and associated |reply_| are deleted.
perkj650fdae2017-08-25 05:00:11 -0700189class TaskQueue::Impl::ReplyTaskOwner {
tommi8c80c6e2017-02-23 00:34:52 -0800190 public:
191 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
192 : reply_(std::move(reply)) {}
193
194 void Run() {
195 RTC_DCHECK(reply_);
196 if (run_task_) {
197 if (!reply_->Run())
198 reply_.release();
199 }
200 reply_.reset();
201 }
202
203 void set_should_run_task() {
204 RTC_DCHECK(!run_task_);
205 run_task_ = true;
206 }
207
208 private:
209 std::unique_ptr<QueuedTask> reply_;
210 bool run_task_ = false;
211};
212
perkj650fdae2017-08-25 05:00:11 -0700213class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
tommic06b1332016-05-14 11:31:40 -0700214 public:
215 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
216 std::unique_ptr<QueuedTask> reply,
perkj650fdae2017-08-25 05:00:11 -0700217 TaskQueue::Impl* reply_queue,
tommi8c80c6e2017-02-23 00:34:52 -0800218 int reply_pipe)
tommic06b1332016-05-14 11:31:40 -0700219 : task_(std::move(task)),
tommi8c80c6e2017-02-23 00:34:52 -0800220 reply_pipe_(reply_pipe),
221 reply_task_owner_(
222 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
223 reply_queue->PrepareReplyTask(reply_task_owner_);
tommic06b1332016-05-14 11:31:40 -0700224 }
225
226 ~PostAndReplyTask() override {
tommi8c80c6e2017-02-23 00:34:52 -0800227 reply_task_owner_ = nullptr;
228 IgnoreSigPipeSignalOnCurrentThread();
229 // Send a signal to the reply queue that the reply task can run now.
230 // Depending on whether |set_should_run_task()| was called by the
231 // PostAndReplyTask(), the reply task may or may not actually run.
232 // In either case, it will be deleted.
233 char message = kRunReplyTask;
234 write(reply_pipe_, &message, sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700235 }
236
237 private:
238 bool Run() override {
239 if (!task_->Run())
240 task_.release();
tommi8c80c6e2017-02-23 00:34:52 -0800241 reply_task_owner_->set_should_run_task();
tommic06b1332016-05-14 11:31:40 -0700242 return true;
243 }
244
tommic06b1332016-05-14 11:31:40 -0700245 std::unique_ptr<QueuedTask> task_;
tommi8c80c6e2017-02-23 00:34:52 -0800246 int reply_pipe_;
247 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
tommic06b1332016-05-14 11:31:40 -0700248};
249
perkj650fdae2017-08-25 05:00:11 -0700250class TaskQueue::Impl::SetTimerTask : public QueuedTask {
tommic06b1332016-05-14 11:31:40 -0700251 public:
252 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
253 : task_(std::move(task)),
254 milliseconds_(milliseconds),
255 posted_(Time32()) {}
256
257 private:
258 bool Run() override {
259 // Compensate for the time that has passed since construction
260 // and until we got here.
261 uint32_t post_time = Time32() - posted_;
perkj650fdae2017-08-25 05:00:11 -0700262 TaskQueue::Impl::Current()->PostDelayedTask(
tommic06b1332016-05-14 11:31:40 -0700263 std::move(task_),
264 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
265 return true;
266 }
267
268 std::unique_ptr<QueuedTask> task_;
269 const uint32_t milliseconds_;
270 const uint32_t posted_;
271};
272
perkj650fdae2017-08-25 05:00:11 -0700273TaskQueue::Impl::Impl(const char* queue_name,
274 TaskQueue* queue,
275 Priority priority /*= NORMAL*/)
276 : queue_(queue),
277 event_base_(event_base_new()),
tommic06b1332016-05-14 11:31:40 -0700278 wakeup_event_(new event()),
perkj650fdae2017-08-25 05:00:11 -0700279 thread_(&TaskQueue::Impl::ThreadMain,
tommic9bb7912017-02-24 10:42:14 -0800280 this,
281 queue_name,
282 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700283 RTC_DCHECK(queue_name);
284 int fds[2];
285 RTC_CHECK(pipe(fds) == 0);
286 SetNonBlocking(fds[0]);
287 SetNonBlocking(fds[1]);
288 wakeup_pipe_out_ = fds[0];
289 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800290
tommi1666b612016-07-13 10:58:12 -0700291 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
292 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700293 event_add(wakeup_event_.get(), 0);
294 thread_.Start();
295}
296
perkj650fdae2017-08-25 05:00:11 -0700297TaskQueue::Impl::~Impl() {
tommic06b1332016-05-14 11:31:40 -0700298 RTC_DCHECK(!IsCurrent());
299 struct timespec ts;
300 char message = kQuit;
301 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
302 // The queue is full, so we have no choice but to wait and retry.
303 RTC_CHECK_EQ(EAGAIN, errno);
304 ts.tv_sec = 0;
305 ts.tv_nsec = 1000000;
306 nanosleep(&ts, nullptr);
307 }
308
309 thread_.Stop();
310
311 event_del(wakeup_event_.get());
tommi8c80c6e2017-02-23 00:34:52 -0800312
313 IgnoreSigPipeSignalOnCurrentThread();
314
tommic06b1332016-05-14 11:31:40 -0700315 close(wakeup_pipe_in_);
316 close(wakeup_pipe_out_);
317 wakeup_pipe_in_ = -1;
318 wakeup_pipe_out_ = -1;
319
tommic06b1332016-05-14 11:31:40 -0700320 event_base_free(event_base_);
321}
322
323// static
perkj650fdae2017-08-25 05:00:11 -0700324TaskQueue::Impl* TaskQueue::Impl::Current() {
tommic06b1332016-05-14 11:31:40 -0700325 QueueContext* ctx =
326 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
327 return ctx ? ctx->queue : nullptr;
328}
329
330// static
perkj650fdae2017-08-25 05:00:11 -0700331TaskQueue* TaskQueue::Impl::CurrentQueue() {
332 TaskQueue::Impl* current = Current();
333 if (current) {
334 return current->queue_;
335 }
336 return nullptr;
337}
338
perkj650fdae2017-08-25 05:00:11 -0700339bool TaskQueue::Impl::IsCurrent() const {
tommic06b1332016-05-14 11:31:40 -0700340 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
341}
342
perkj650fdae2017-08-25 05:00:11 -0700343void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
tommic06b1332016-05-14 11:31:40 -0700344 RTC_DCHECK(task.get());
345 // libevent isn't thread safe. This means that we can't use methods such
346 // as event_base_once to post tasks to the worker thread from a different
347 // thread. However, we can use it when posting from the worker thread itself.
348 if (IsCurrent()) {
perkj650fdae2017-08-25 05:00:11 -0700349 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
tommic06b1332016-05-14 11:31:40 -0700350 task.get(), nullptr) == 0) {
351 task.release();
352 }
353 } else {
354 QueuedTask* task_id = task.get(); // Only used for comparison.
355 {
356 CritScope lock(&pending_lock_);
357 pending_.push_back(std::move(task));
358 }
359 char message = kRunTask;
360 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
361 LOG(WARNING) << "Failed to queue task.";
362 CritScope lock(&pending_lock_);
363 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
364 return t.get() == task_id;
365 });
366 }
367 }
368}
369
perkj650fdae2017-08-25 05:00:11 -0700370void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
371 uint32_t milliseconds) {
tommic06b1332016-05-14 11:31:40 -0700372 if (IsCurrent()) {
373 TimerEvent* timer = new TimerEvent(std::move(task));
perkj650fdae2017-08-25 05:00:11 -0700374 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
375 timer);
tommic06b1332016-05-14 11:31:40 -0700376 QueueContext* ctx =
377 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
378 ctx->pending_timers_.push_back(timer);
kwiberg5b9746e2017-08-16 04:52:35 -0700379 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
380 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
tommic06b1332016-05-14 11:31:40 -0700381 event_add(&timer->ev, &tv);
382 } else {
383 PostTask(std::unique_ptr<QueuedTask>(
384 new SetTimerTask(std::move(task), milliseconds)));
385 }
386}
387
perkj650fdae2017-08-25 05:00:11 -0700388void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
389 std::unique_ptr<QueuedTask> reply,
390 TaskQueue::Impl* reply_queue) {
tommic06b1332016-05-14 11:31:40 -0700391 std::unique_ptr<QueuedTask> wrapper_task(
tommi8c80c6e2017-02-23 00:34:52 -0800392 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
393 reply_queue->wakeup_pipe_in_));
tommic06b1332016-05-14 11:31:40 -0700394 PostTask(std::move(wrapper_task));
395}
396
tommic06b1332016-05-14 11:31:40 -0700397// static
perkj650fdae2017-08-25 05:00:11 -0700398void TaskQueue::Impl::ThreadMain(void* context) {
399 TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
tommic06b1332016-05-14 11:31:40 -0700400
401 QueueContext queue_context(me);
402 pthread_setspecific(GetQueuePtrTls(), &queue_context);
403
404 while (queue_context.is_active)
405 event_base_loop(me->event_base_, 0);
406
407 pthread_setspecific(GetQueuePtrTls(), nullptr);
408
409 for (TimerEvent* timer : queue_context.pending_timers_)
410 delete timer;
tommic06b1332016-05-14 11:31:40 -0700411}
412
413// static
perkj650fdae2017-08-25 05:00:11 -0700414void TaskQueue::Impl::OnWakeup(int socket,
415 short flags,
416 void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700417 QueueContext* ctx =
418 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
419 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
420 char buf;
421 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
422 switch (buf) {
423 case kQuit:
424 ctx->is_active = false;
425 event_base_loopbreak(ctx->queue->event_base_);
426 break;
427 case kRunTask: {
428 std::unique_ptr<QueuedTask> task;
429 {
430 CritScope lock(&ctx->queue->pending_lock_);
431 RTC_DCHECK(!ctx->queue->pending_.empty());
432 task = std::move(ctx->queue->pending_.front());
433 ctx->queue->pending_.pop_front();
434 RTC_DCHECK(task.get());
435 }
436 if (!task->Run())
437 task.release();
438 break;
439 }
tommi8c80c6e2017-02-23 00:34:52 -0800440 case kRunReplyTask: {
441 scoped_refptr<ReplyTaskOwnerRef> reply_task;
442 {
443 CritScope lock(&ctx->queue->pending_lock_);
444 for (auto it = ctx->queue->pending_replies_.begin();
445 it != ctx->queue->pending_replies_.end(); ++it) {
446 if ((*it)->HasOneRef()) {
447 reply_task = std::move(*it);
448 ctx->queue->pending_replies_.erase(it);
449 break;
450 }
451 }
452 }
453 reply_task->Run();
454 break;
455 }
tommic06b1332016-05-14 11:31:40 -0700456 default:
457 RTC_NOTREACHED();
458 break;
459 }
460}
461
462// static
perkj650fdae2017-08-25 05:00:11 -0700463void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700464 auto* task = static_cast<QueuedTask*>(context);
465 if (task->Run())
466 delete task;
467}
468
469// static
perkj650fdae2017-08-25 05:00:11 -0700470void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700471 TimerEvent* timer = static_cast<TimerEvent*>(context);
472 if (!timer->task->Run())
473 timer->task.release();
474 QueueContext* ctx =
475 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
476 ctx->pending_timers_.remove(timer);
477 delete timer;
478}
479
perkj650fdae2017-08-25 05:00:11 -0700480void TaskQueue::Impl::PrepareReplyTask(
481 scoped_refptr<ReplyTaskOwnerRef> reply_task) {
tommic06b1332016-05-14 11:31:40 -0700482 RTC_DCHECK(reply_task);
483 CritScope lock(&pending_lock_);
tommi8c80c6e2017-02-23 00:34:52 -0800484 pending_replies_.push_back(std::move(reply_task));
tommic06b1332016-05-14 11:31:40 -0700485}
486
perkj650fdae2017-08-25 05:00:11 -0700487TaskQueue::TaskQueue(const char* queue_name, Priority priority)
488 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
489}
490
491TaskQueue::~TaskQueue() {}
492
493// static
494TaskQueue* TaskQueue::Current() {
495 return TaskQueue::Impl::CurrentQueue();
496}
497
498// Used for DCHECKing the current queue.
perkj650fdae2017-08-25 05:00:11 -0700499bool TaskQueue::IsCurrent() const {
500 return impl_->IsCurrent();
501}
502
503void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
504 return TaskQueue::impl_->PostTask(std::move(task));
505}
506
507void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
508 std::unique_ptr<QueuedTask> reply,
509 TaskQueue* reply_queue) {
510 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
511 reply_queue->impl_.get());
512}
513
514void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
515 std::unique_ptr<QueuedTask> reply) {
516 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
517 impl_.get());
518}
519
520void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
521 uint32_t milliseconds) {
522 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
523}
524
tommic06b1332016-05-14 11:31:40 -0700525} // namespace rtc