blob: ad7a50327b900bf250ab231381437ef6c52a9288 [file] [log] [blame]
eladalon413ee9a2017-08-22 04:02:52 -07001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020011#include "test/single_threaded_task_queue.h"
eladalon413ee9a2017-08-22 04:02:52 -070012
13#include <utility>
14
Karl Wiberg918f50c2018-07-05 11:40:33 +020015#include "absl/memory/memory.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020016#include "rtc_base/checks.h"
Karl Wiberge40468b2017-11-22 10:42:26 +010017#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020018#include "rtc_base/timeutils.h"
eladalon413ee9a2017-08-22 04:02:52 -070019
20namespace webrtc {
21namespace test {
22
23SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask(
24 SingleThreadedTaskQueueForTesting::TaskId task_id,
25 int64_t earliest_execution_time,
26 SingleThreadedTaskQueueForTesting::Task task)
27 : task_id(task_id),
28 earliest_execution_time(earliest_execution_time),
29 task(task) {}
30
31SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = default;
32
33SingleThreadedTaskQueueForTesting::SingleThreadedTaskQueueForTesting(
34 const char* name)
35 : thread_(Run, this, name),
36 running_(true),
37 next_task_id_(0),
38 wake_up_(false, false) {
39 thread_.Start();
40}
41
42SingleThreadedTaskQueueForTesting::~SingleThreadedTaskQueueForTesting() {
43 RTC_DCHECK_RUN_ON(&owner_thread_checker_);
44 {
45 rtc::CritScope lock(&cs_);
46 running_ = false;
47 }
48 wake_up_.Set();
49 thread_.Stop();
50}
51
52SingleThreadedTaskQueueForTesting::TaskId
53SingleThreadedTaskQueueForTesting::PostTask(Task task) {
54 return PostDelayedTask(task, 0);
55}
56
57SingleThreadedTaskQueueForTesting::TaskId
58SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task,
59 int64_t delay_ms) {
60 int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
61
62 rtc::CritScope lock(&cs_);
63
64 TaskId id = next_task_id_++;
65
66 // Insert after any other tasks with an earlier-or-equal target time.
67 auto it = tasks_.begin();
68 for (; it != tasks_.end(); it++) {
69 if (earliest_exec_time < (*it)->earliest_execution_time) {
70 break;
71 }
72 }
Karl Wiberg918f50c2018-07-05 11:40:33 +020073 tasks_.insert(it,
74 absl::make_unique<QueuedTask>(id, earliest_exec_time, task));
eladalon413ee9a2017-08-22 04:02:52 -070075
76 // This class is optimized for simplicty, not for performance. This will wake
77 // the thread up even if the next task in the queue is only scheduled for
78 // quite some time from now. In that case, the thread will just send itself
79 // back to sleep.
80 wake_up_.Set();
81
82 return id;
83}
84
85void SingleThreadedTaskQueueForTesting::SendTask(Task task) {
86 rtc::Event done(true, false);
87 PostTask([&task, &done]() {
88 task();
89 done.Set();
90 });
91 done.Wait(rtc::Event::kForever);
92}
93
94bool SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
95 rtc::CritScope lock(&cs_);
96 for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
97 if ((*it)->task_id == task_id) {
98 tasks_.erase(it);
99 return true;
100 }
101 }
102 return false;
103}
104
105void SingleThreadedTaskQueueForTesting::Run(void* obj) {
106 static_cast<SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
107}
108
109void SingleThreadedTaskQueueForTesting::RunLoop() {
110 while (true) {
111 std::unique_ptr<QueuedTask> queued_task;
112
113 // An empty queue would lead to sleeping until the queue becoems non-empty.
Mirko Bonadeidca82bc2017-12-13 18:44:59 +0100114 // A queue where the earliest task is scheduled for later than now, will
eladalon413ee9a2017-08-22 04:02:52 -0700115 // lead to sleeping until the time of the next scheduled task (or until
116 // more tasks are scheduled).
117 int wait_time = rtc::Event::kForever;
118
119 {
120 rtc::CritScope lock(&cs_);
121 if (!running_) {
122 return;
123 }
124 if (!tasks_.empty()) {
125 int64_t remaining_delay_ms = rtc::TimeDiff(
126 tasks_.front()->earliest_execution_time, rtc::TimeMillis());
127 if (remaining_delay_ms <= 0) {
128 queued_task = std::move(tasks_.front());
129 tasks_.pop_front();
130 } else {
131 wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
132 }
133 }
134 }
135
136 if (queued_task) {
137 queued_task->task();
138 } else {
139 wake_up_.Wait(wait_time);
140 }
141 }
142}
143
144} // namespace test
145} // namespace webrtc