Extract activity executor into separate class from PC level fixture impl
Bug: webrtc:11479
Change-Id: Ida9c944d928e9973bf543a2e5b415a7c9007b833
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173024
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31032}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index f238f9b..537b6a5 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -304,6 +304,28 @@
]
}
+ rtc_library("test_activities_executor") {
+ visibility = [ "*" ]
+ testonly = true
+ sources = [
+ "test_activities_executor.cc",
+ "test_activities_executor.h",
+ ]
+ deps = [
+ "../../../api/units:time_delta",
+ "../../../api/units:timestamp",
+ "../../../rtc_base:checks",
+ "../../../rtc_base:criticalsection",
+ "../../../rtc_base:logging",
+ "../../../rtc_base:rtc_base_approved",
+ "../../../rtc_base:task_queue_for_test",
+ "../../../rtc_base/task_utils:repeating_task",
+ "../../../system_wrappers",
+ "//third_party/abseil-cpp/absl/memory",
+ "//third_party/abseil-cpp/absl/types:optional",
+ ]
+ }
+
rtc_library("peerconnection_quality_test") {
visibility = [ "*" ]
testonly = true
@@ -322,6 +344,7 @@
":sdp_changer",
":single_process_encoded_image_data_injector",
":stats_poller",
+ ":test_activities_executor",
":test_peer",
":test_peer_factory",
":video_quality_analyzer_injection_helper",
@@ -348,7 +371,6 @@
"../../../rtc_base:rtc_base_approved",
"../../../rtc_base:safe_conversions",
"../../../rtc_base:task_queue_for_test",
- "../../../rtc_base/task_utils:repeating_task",
"../../../system_wrappers",
"../../../system_wrappers:field_trial",
]
diff --git a/test/pc/e2e/peer_connection_quality_test.cc b/test/pc/e2e/peer_connection_quality_test.cc
index 8b2734d..d97eeba 100644
--- a/test/pc/e2e/peer_connection_quality_test.cc
+++ b/test/pc/e2e/peer_connection_quality_test.cc
@@ -22,7 +22,6 @@
#include "api/scoped_refptr.h"
#include "api/task_queue/default_task_queue_factory.h"
#include "api/test/video_quality_analyzer_interface.h"
-#include "api/units/time_delta.h"
#include "pc/sdp_utils.h"
#include "pc/test/mock_peer_connection_observers.h"
#include "rtc_base/bind.h"
@@ -106,7 +105,8 @@
std::unique_ptr<VideoQualityAnalyzerInterface> video_quality_analyzer)
: clock_(Clock::GetRealTimeClock()),
task_queue_factory_(CreateDefaultTaskQueueFactory()),
- test_case_name_(std::move(test_case_name)) {
+ test_case_name_(std::move(test_case_name)),
+ executor_(std::make_unique<TestActivitiesExecutor>(clock_)) {
// Create default video quality analyzer. We will always create an analyzer,
// even if there are no video streams, because it will be installed into video
// encoder/decoder factories.
@@ -129,74 +129,14 @@
void PeerConnectionE2EQualityTest::ExecuteAt(
TimeDelta target_time_since_start,
std::function<void(TimeDelta)> func) {
- ExecuteTask(target_time_since_start, absl::nullopt, func);
+ executor_->ScheduleActivity(target_time_since_start, absl::nullopt, func);
}
void PeerConnectionE2EQualityTest::ExecuteEvery(
TimeDelta initial_delay_since_start,
TimeDelta interval,
std::function<void(TimeDelta)> func) {
- ExecuteTask(initial_delay_since_start, interval, func);
-}
-
-void PeerConnectionE2EQualityTest::ExecuteTask(
- TimeDelta initial_delay_since_start,
- absl::optional<TimeDelta> interval,
- std::function<void(TimeDelta)> func) {
- RTC_CHECK(initial_delay_since_start.IsFinite() &&
- initial_delay_since_start >= TimeDelta::Zero());
- RTC_CHECK(!interval ||
- (interval->IsFinite() && *interval > TimeDelta::Zero()));
- rtc::CritScope crit(&lock_);
- ScheduledActivity activity(initial_delay_since_start, interval, func);
- if (start_time_.IsInfinite()) {
- scheduled_activities_.push(std::move(activity));
- } else {
- PostTask(std::move(activity));
- }
-}
-
-void PeerConnectionE2EQualityTest::PostTask(ScheduledActivity activity) {
- // Because start_time_ will never change at this point copy it to local
- // variable to capture in in lambda without requirement to hold a lock.
- Timestamp start_time = start_time_;
-
- TimeDelta remaining_delay =
- activity.initial_delay_since_start == TimeDelta::Zero()
- ? TimeDelta::Zero()
- : activity.initial_delay_since_start - (Now() - start_time_);
- if (remaining_delay < TimeDelta::Zero()) {
- RTC_LOG(WARNING) << "Executing late task immediately, late by="
- << ToString(remaining_delay.Abs());
- remaining_delay = TimeDelta::Zero();
- }
-
- if (activity.interval) {
- if (remaining_delay == TimeDelta::Zero()) {
- repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
- task_queue_->Get(), [activity, start_time, this]() {
- activity.func(Now() - start_time);
- return *activity.interval;
- }));
- return;
- }
- repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
- task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
- activity.func(Now() - start_time);
- return *activity.interval;
- }));
- return;
- }
-
- if (remaining_delay == TimeDelta::Zero()) {
- task_queue_->PostTask(
- [activity, start_time, this]() { activity.func(Now() - start_time); });
- return;
- }
-
- task_queue_->PostDelayedTask(
- [activity, start_time, this]() { activity.func(Now() - start_time); },
- remaining_delay.ms());
+ executor_->ScheduleActivity(initial_delay_since_start, interval, func);
}
void PeerConnectionE2EQualityTest::AddQualityMetricsReporter(
@@ -342,20 +282,7 @@
RTC_LOG(INFO) << "Configuration is done. Now Alice is calling to Bob...";
- // Setup call.
- signaling_thread->Invoke<void>(
- RTC_FROM_HERE,
- rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
- run_params));
- {
- rtc::CritScope crit(&lock_);
- start_time_ = Now();
- while (!scheduled_activities_.empty()) {
- PostTask(std::move(scheduled_activities_.front()));
- scheduled_activities_.pop();
- }
- }
-
+ // Setup stats poller.
std::vector<StatsObserverInterface*> observers = {
audio_quality_analyzer_.get(),
video_quality_analyzer_injection_helper_.get()};
@@ -364,16 +291,18 @@
}
StatsPoller stats_poller(observers,
{{"alice", alice_.get()}, {"bob", bob_.get()}});
+ executor_->ScheduleActivity(TimeDelta::Zero(), kStatsUpdateInterval,
+ [&stats_poller](TimeDelta) {
+ stats_poller.PollStatsAndNotifyObservers();
+ });
- task_queue_->PostTask([&stats_poller, this]() {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- stats_polling_task_ =
- RepeatingTaskHandle::Start(task_queue_->Get(), [this, &stats_poller]() {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- stats_poller.PollStatsAndNotifyObservers();
- return kStatsUpdateInterval;
- });
- });
+ // Setup call.
+ signaling_thread->Invoke<void>(
+ RTC_FROM_HERE,
+ rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
+ run_params));
+ executor_->Start(task_queue_.get());
+ Timestamp start_time = Now();
rtc::Event done;
bool is_quick_test_enabled = field_trial::IsEnabled("WebRTC-QuickPerfTest");
@@ -385,30 +314,13 @@
RTC_LOG(INFO) << "Test is done, initiating disconnect sequence.";
- task_queue_->SendTask(
- [&stats_poller, this]() {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- stats_polling_task_.Stop();
- // Get final end-of-call stats.
- stats_poller.PollStatsAndNotifyObservers();
- },
- RTC_FROM_HERE);
-
+ // Stop all client started tasks to prevent their access to any call related
+ // objects after these objects will be destroyed during call tear down.
+ executor_->Stop();
// We need to detach AEC dumping from peers, because dump uses |task_queue_|
// inside.
alice_->DetachAecDump();
bob_->DetachAecDump();
- // Stop all client started tasks on task queue to prevent their access to any
- // call related objects after these objects will be destroyed during call tear
- // down.
- task_queue_->SendTask(
- [this]() {
- rtc::CritScope crit(&lock_);
- for (auto& handle : repeating_task_handles_) {
- handle.Stop();
- }
- },
- RTC_FROM_HERE);
// Tear down the call.
signaling_thread->Invoke<void>(
RTC_FROM_HERE,
@@ -418,7 +330,7 @@
RTC_LOG(INFO) << "All peers are disconnected.";
{
rtc::CritScope crit(&lock_);
- real_test_duration_ = end_time - start_time_;
+ real_test_duration_ = end_time - start_time;
}
audio_quality_analyzer_->Stop();
@@ -729,13 +641,5 @@
return clock_->CurrentTime();
}
-PeerConnectionE2EQualityTest::ScheduledActivity::ScheduledActivity(
- TimeDelta initial_delay_since_start,
- absl::optional<TimeDelta> interval,
- std::function<void(TimeDelta)> func)
- : initial_delay_since_start(initial_delay_since_start),
- interval(interval),
- func(std::move(func)) {}
-
} // namespace webrtc_pc_e2e
} // namespace webrtc
diff --git a/test/pc/e2e/peer_connection_quality_test.h b/test/pc/e2e/peer_connection_quality_test.h
index 73e2663..081235b 100644
--- a/test/pc/e2e/peer_connection_quality_test.h
+++ b/test/pc/e2e/peer_connection_quality_test.h
@@ -21,7 +21,6 @@
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/task_queue_for_test.h"
-#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
#include "system_wrappers/include/clock.h"
@@ -33,6 +32,7 @@
#include "test/pc/e2e/peer_configurer.h"
#include "test/pc/e2e/peer_connection_quality_test_params.h"
#include "test/pc/e2e/sdp/sdp_changer.h"
+#include "test/pc/e2e/test_activities_executor.h"
#include "test/pc/e2e/test_peer.h"
namespace webrtc {
@@ -79,20 +79,6 @@
}
private:
- struct ScheduledActivity {
- ScheduledActivity(TimeDelta initial_delay_since_start,
- absl::optional<TimeDelta> interval,
- std::function<void(TimeDelta)> func);
-
- TimeDelta initial_delay_since_start;
- absl::optional<TimeDelta> interval;
- std::function<void(TimeDelta)> func;
- };
-
- void ExecuteTask(TimeDelta initial_delay_since_start,
- absl::optional<TimeDelta> interval,
- std::function<void(TimeDelta)> func);
- void PostTask(ScheduledActivity activity) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// For some functionality some field trials have to be enabled, so we will
// enable them here.
void SetupRequiredFieldTrials(const RunParams& run_params);
@@ -120,6 +106,7 @@
std::unique_ptr<SingleProcessEncodedImageDataInjector>
encoded_image_id_controller_;
std::unique_ptr<AudioQualityAnalyzerInterface> audio_quality_analyzer_;
+ std::unique_ptr<TestActivitiesExecutor> executor_;
std::vector<std::unique_ptr<PeerConfigurerImpl>> peer_configurations_;
@@ -139,20 +126,7 @@
AnalyzerHelper analyzer_helper_;
rtc::CriticalSection lock_;
- // Time when test call was started. Minus infinity means that call wasn't
- // started yet.
- Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
TimeDelta real_test_duration_ RTC_GUARDED_BY(lock_) = TimeDelta::Zero();
- // Queue of activities that were added before test call was started.
- // Activities from this queue will be posted on the |task_queue_| after test
- // call will be set up and then this queue will be unused.
- std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
- // List of task handles for activities, that are posted on |task_queue_| as
- // repeated during the call.
- std::vector<RepeatingTaskHandle> repeating_task_handles_
- RTC_GUARDED_BY(lock_);
-
- RepeatingTaskHandle stats_polling_task_ RTC_GUARDED_BY(&task_queue_);
// Task queue, that is used for running activities during test call.
// This task queue will be created before call set up and will be destroyed
diff --git a/test/pc/e2e/test_activities_executor.cc b/test/pc/e2e/test_activities_executor.cc
new file mode 100644
index 0000000..4ace6ae
--- /dev/null
+++ b/test/pc/e2e/test_activities_executor.cc
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/test_activities_executor.h"
+
+#include <memory>
+#include <utility>
+
+#include "absl/memory/memory.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/location.h"
+#include "rtc_base/logging.h"
+
+namespace webrtc {
+namespace webrtc_pc_e2e {
+
+void TestActivitiesExecutor::Start(TaskQueueForTest* task_queue) {
+ RTC_DCHECK(task_queue);
+ task_queue_ = task_queue;
+ rtc::CritScope crit(&lock_);
+ start_time_ = Now();
+ while (!scheduled_activities_.empty()) {
+ PostActivity(std::move(scheduled_activities_.front()));
+ scheduled_activities_.pop();
+ }
+}
+
+void TestActivitiesExecutor::Stop() {
+ if (task_queue_ == nullptr) {
+ // Already stopped or not started.
+ return;
+ }
+ task_queue_->SendTask(
+ [this]() {
+ rtc::CritScope crit(&lock_);
+ for (auto& handle : repeating_task_handles_) {
+ handle.Stop();
+ }
+ },
+ RTC_FROM_HERE);
+ task_queue_ = nullptr;
+}
+
+void TestActivitiesExecutor::ScheduleActivity(
+ TimeDelta initial_delay_since_start,
+ absl::optional<TimeDelta> interval,
+ std::function<void(TimeDelta)> func) {
+ RTC_CHECK(initial_delay_since_start.IsFinite() &&
+ initial_delay_since_start >= TimeDelta::Zero());
+ RTC_CHECK(!interval ||
+ (interval->IsFinite() && *interval > TimeDelta::Zero()));
+ rtc::CritScope crit(&lock_);
+ ScheduledActivity activity(initial_delay_since_start, interval, func);
+ if (start_time_.IsInfinite()) {
+ scheduled_activities_.push(std::move(activity));
+ } else {
+ PostActivity(std::move(activity));
+ }
+}
+
+void TestActivitiesExecutor::PostActivity(ScheduledActivity activity) {
+ // Because start_time_ will never change at this point copy it to local
+ // variable to capture in in lambda without requirement to hold a lock.
+ Timestamp start_time = start_time_;
+
+ TimeDelta remaining_delay =
+ activity.initial_delay_since_start == TimeDelta::Zero()
+ ? TimeDelta::Zero()
+ : activity.initial_delay_since_start - (Now() - start_time);
+ if (remaining_delay < TimeDelta::Zero()) {
+ RTC_LOG(WARNING) << "Executing late task immediately, late by="
+ << ToString(remaining_delay.Abs());
+ remaining_delay = TimeDelta::Zero();
+ }
+
+ if (activity.interval) {
+ if (remaining_delay == TimeDelta::Zero()) {
+ repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
+ task_queue_->Get(), [activity, start_time, this]() {
+ activity.func(Now() - start_time);
+ return *activity.interval;
+ }));
+ return;
+ }
+ repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
+ task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
+ activity.func(Now() - start_time);
+ return *activity.interval;
+ }));
+ return;
+ }
+
+ if (remaining_delay == TimeDelta::Zero()) {
+ task_queue_->PostTask(
+ [activity, start_time, this]() { activity.func(Now() - start_time); });
+ return;
+ }
+
+ task_queue_->PostDelayedTask(
+ [activity, start_time, this]() { activity.func(Now() - start_time); },
+ remaining_delay.ms());
+}
+
+Timestamp TestActivitiesExecutor::Now() const {
+ return clock_->CurrentTime();
+}
+
+TestActivitiesExecutor::ScheduledActivity::ScheduledActivity(
+ TimeDelta initial_delay_since_start,
+ absl::optional<TimeDelta> interval,
+ std::function<void(TimeDelta)> func)
+ : initial_delay_since_start(initial_delay_since_start),
+ interval(interval),
+ func(std::move(func)) {}
+
+} // namespace webrtc_pc_e2e
+} // namespace webrtc
diff --git a/test/pc/e2e/test_activities_executor.h b/test/pc/e2e/test_activities_executor.h
new file mode 100644
index 0000000..09bfe41
--- /dev/null
+++ b/test/pc/e2e/test_activities_executor.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
+#define TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
+
+#include <queue>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "system_wrappers/include/clock.h"
+
+namespace webrtc {
+namespace webrtc_pc_e2e {
+
+class TestActivitiesExecutor {
+ public:
+ explicit TestActivitiesExecutor(Clock* clock) : clock_(clock) {}
+ ~TestActivitiesExecutor() { Stop(); }
+
+ // Starts scheduled activities according to their schedule. All activities
+ // that will be scheduled after Start(...) was invoked will be executed
+ // immediately according to their schedule.
+ void Start(TaskQueueForTest* task_queue);
+ void Stop();
+
+ // Schedule activity to be executed. If test isn't started yet, then activity
+ // will be executed according to its schedule after Start() will be invoked.
+ // If test is started, then it will be executed immediately according to its
+ // schedule.
+ void ScheduleActivity(TimeDelta initial_delay_since_start,
+ absl::optional<TimeDelta> interval,
+ std::function<void(TimeDelta)> func);
+
+ private:
+ struct ScheduledActivity {
+ ScheduledActivity(TimeDelta initial_delay_since_start,
+ absl::optional<TimeDelta> interval,
+ std::function<void(TimeDelta)> func);
+
+ TimeDelta initial_delay_since_start;
+ absl::optional<TimeDelta> interval;
+ std::function<void(TimeDelta)> func;
+ };
+
+ void PostActivity(ScheduledActivity activity)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+ Timestamp Now() const;
+
+ Clock* const clock_;
+
+ TaskQueueForTest* task_queue_;
+
+ rtc::CriticalSection lock_;
+ // Time when test was started. Minus infinity means that it wasn't started
+ // yet.
+ Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
+ // Queue of activities that were added before test was started.
+ // Activities from this queue will be posted on the |task_queue_| after test
+ // will be set up and then this queue will be unused.
+ std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
+ // List of task handles for activities, that are posted on |task_queue_| as
+ // repeated during the call.
+ std::vector<RepeatingTaskHandle> repeating_task_handles_
+ RTC_GUARDED_BY(lock_);
+};
+
+} // namespace webrtc_pc_e2e
+} // namespace webrtc
+
+#endif // TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_