Extract activity executor into separate class from PC level fixture impl

Bug: webrtc:11479
Change-Id: Ida9c944d928e9973bf543a2e5b415a7c9007b833
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173024
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31032}
diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn
index f238f9b..537b6a5 100644
--- a/test/pc/e2e/BUILD.gn
+++ b/test/pc/e2e/BUILD.gn
@@ -304,6 +304,28 @@
     ]
   }
 
+  rtc_library("test_activities_executor") {
+    visibility = [ "*" ]
+    testonly = true
+    sources = [
+      "test_activities_executor.cc",
+      "test_activities_executor.h",
+    ]
+    deps = [
+      "../../../api/units:time_delta",
+      "../../../api/units:timestamp",
+      "../../../rtc_base:checks",
+      "../../../rtc_base:criticalsection",
+      "../../../rtc_base:logging",
+      "../../../rtc_base:rtc_base_approved",
+      "../../../rtc_base:task_queue_for_test",
+      "../../../rtc_base/task_utils:repeating_task",
+      "../../../system_wrappers",
+      "//third_party/abseil-cpp/absl/memory",
+      "//third_party/abseil-cpp/absl/types:optional",
+    ]
+  }
+
   rtc_library("peerconnection_quality_test") {
     visibility = [ "*" ]
     testonly = true
@@ -322,6 +344,7 @@
       ":sdp_changer",
       ":single_process_encoded_image_data_injector",
       ":stats_poller",
+      ":test_activities_executor",
       ":test_peer",
       ":test_peer_factory",
       ":video_quality_analyzer_injection_helper",
@@ -348,7 +371,6 @@
       "../../../rtc_base:rtc_base_approved",
       "../../../rtc_base:safe_conversions",
       "../../../rtc_base:task_queue_for_test",
-      "../../../rtc_base/task_utils:repeating_task",
       "../../../system_wrappers",
       "../../../system_wrappers:field_trial",
     ]
diff --git a/test/pc/e2e/peer_connection_quality_test.cc b/test/pc/e2e/peer_connection_quality_test.cc
index 8b2734d..d97eeba 100644
--- a/test/pc/e2e/peer_connection_quality_test.cc
+++ b/test/pc/e2e/peer_connection_quality_test.cc
@@ -22,7 +22,6 @@
 #include "api/scoped_refptr.h"
 #include "api/task_queue/default_task_queue_factory.h"
 #include "api/test/video_quality_analyzer_interface.h"
-#include "api/units/time_delta.h"
 #include "pc/sdp_utils.h"
 #include "pc/test/mock_peer_connection_observers.h"
 #include "rtc_base/bind.h"
@@ -106,7 +105,8 @@
     std::unique_ptr<VideoQualityAnalyzerInterface> video_quality_analyzer)
     : clock_(Clock::GetRealTimeClock()),
       task_queue_factory_(CreateDefaultTaskQueueFactory()),
-      test_case_name_(std::move(test_case_name)) {
+      test_case_name_(std::move(test_case_name)),
+      executor_(std::make_unique<TestActivitiesExecutor>(clock_)) {
   // Create default video quality analyzer. We will always create an analyzer,
   // even if there are no video streams, because it will be installed into video
   // encoder/decoder factories.
@@ -129,74 +129,14 @@
 void PeerConnectionE2EQualityTest::ExecuteAt(
     TimeDelta target_time_since_start,
     std::function<void(TimeDelta)> func) {
-  ExecuteTask(target_time_since_start, absl::nullopt, func);
+  executor_->ScheduleActivity(target_time_since_start, absl::nullopt, func);
 }
 
 void PeerConnectionE2EQualityTest::ExecuteEvery(
     TimeDelta initial_delay_since_start,
     TimeDelta interval,
     std::function<void(TimeDelta)> func) {
-  ExecuteTask(initial_delay_since_start, interval, func);
-}
-
-void PeerConnectionE2EQualityTest::ExecuteTask(
-    TimeDelta initial_delay_since_start,
-    absl::optional<TimeDelta> interval,
-    std::function<void(TimeDelta)> func) {
-  RTC_CHECK(initial_delay_since_start.IsFinite() &&
-            initial_delay_since_start >= TimeDelta::Zero());
-  RTC_CHECK(!interval ||
-            (interval->IsFinite() && *interval > TimeDelta::Zero()));
-  rtc::CritScope crit(&lock_);
-  ScheduledActivity activity(initial_delay_since_start, interval, func);
-  if (start_time_.IsInfinite()) {
-    scheduled_activities_.push(std::move(activity));
-  } else {
-    PostTask(std::move(activity));
-  }
-}
-
-void PeerConnectionE2EQualityTest::PostTask(ScheduledActivity activity) {
-  // Because start_time_ will never change at this point copy it to local
-  // variable to capture in in lambda without requirement to hold a lock.
-  Timestamp start_time = start_time_;
-
-  TimeDelta remaining_delay =
-      activity.initial_delay_since_start == TimeDelta::Zero()
-          ? TimeDelta::Zero()
-          : activity.initial_delay_since_start - (Now() - start_time_);
-  if (remaining_delay < TimeDelta::Zero()) {
-    RTC_LOG(WARNING) << "Executing late task immediately, late by="
-                     << ToString(remaining_delay.Abs());
-    remaining_delay = TimeDelta::Zero();
-  }
-
-  if (activity.interval) {
-    if (remaining_delay == TimeDelta::Zero()) {
-      repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
-          task_queue_->Get(), [activity, start_time, this]() {
-            activity.func(Now() - start_time);
-            return *activity.interval;
-          }));
-      return;
-    }
-    repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
-        task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
-          activity.func(Now() - start_time);
-          return *activity.interval;
-        }));
-    return;
-  }
-
-  if (remaining_delay == TimeDelta::Zero()) {
-    task_queue_->PostTask(
-        [activity, start_time, this]() { activity.func(Now() - start_time); });
-    return;
-  }
-
-  task_queue_->PostDelayedTask(
-      [activity, start_time, this]() { activity.func(Now() - start_time); },
-      remaining_delay.ms());
+  executor_->ScheduleActivity(initial_delay_since_start, interval, func);
 }
 
 void PeerConnectionE2EQualityTest::AddQualityMetricsReporter(
@@ -342,20 +282,7 @@
 
   RTC_LOG(INFO) << "Configuration is done. Now Alice is calling to Bob...";
 
-  // Setup call.
-  signaling_thread->Invoke<void>(
-      RTC_FROM_HERE,
-      rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
-                run_params));
-  {
-    rtc::CritScope crit(&lock_);
-    start_time_ = Now();
-    while (!scheduled_activities_.empty()) {
-      PostTask(std::move(scheduled_activities_.front()));
-      scheduled_activities_.pop();
-    }
-  }
-
+  // Setup stats poller.
   std::vector<StatsObserverInterface*> observers = {
       audio_quality_analyzer_.get(),
       video_quality_analyzer_injection_helper_.get()};
@@ -364,16 +291,18 @@
   }
   StatsPoller stats_poller(observers,
                            {{"alice", alice_.get()}, {"bob", bob_.get()}});
+  executor_->ScheduleActivity(TimeDelta::Zero(), kStatsUpdateInterval,
+                              [&stats_poller](TimeDelta) {
+                                stats_poller.PollStatsAndNotifyObservers();
+                              });
 
-  task_queue_->PostTask([&stats_poller, this]() {
-    RTC_DCHECK_RUN_ON(task_queue_.get());
-    stats_polling_task_ =
-        RepeatingTaskHandle::Start(task_queue_->Get(), [this, &stats_poller]() {
-          RTC_DCHECK_RUN_ON(task_queue_.get());
-          stats_poller.PollStatsAndNotifyObservers();
-          return kStatsUpdateInterval;
-        });
-  });
+  // Setup call.
+  signaling_thread->Invoke<void>(
+      RTC_FROM_HERE,
+      rtc::Bind(&PeerConnectionE2EQualityTest::SetupCallOnSignalingThread, this,
+                run_params));
+  executor_->Start(task_queue_.get());
+  Timestamp start_time = Now();
 
   rtc::Event done;
   bool is_quick_test_enabled = field_trial::IsEnabled("WebRTC-QuickPerfTest");
@@ -385,30 +314,13 @@
 
   RTC_LOG(INFO) << "Test is done, initiating disconnect sequence.";
 
-  task_queue_->SendTask(
-      [&stats_poller, this]() {
-        RTC_DCHECK_RUN_ON(task_queue_.get());
-        stats_polling_task_.Stop();
-        // Get final end-of-call stats.
-        stats_poller.PollStatsAndNotifyObservers();
-      },
-      RTC_FROM_HERE);
-
+  // Stop all client started tasks to prevent their access to any call related
+  // objects after these objects will be destroyed during call tear down.
+  executor_->Stop();
   // We need to detach AEC dumping from peers, because dump uses |task_queue_|
   // inside.
   alice_->DetachAecDump();
   bob_->DetachAecDump();
-  // Stop all client started tasks on task queue to prevent their access to any
-  // call related objects after these objects will be destroyed during call tear
-  // down.
-  task_queue_->SendTask(
-      [this]() {
-        rtc::CritScope crit(&lock_);
-        for (auto& handle : repeating_task_handles_) {
-          handle.Stop();
-        }
-      },
-      RTC_FROM_HERE);
   // Tear down the call.
   signaling_thread->Invoke<void>(
       RTC_FROM_HERE,
@@ -418,7 +330,7 @@
   RTC_LOG(INFO) << "All peers are disconnected.";
   {
     rtc::CritScope crit(&lock_);
-    real_test_duration_ = end_time - start_time_;
+    real_test_duration_ = end_time - start_time;
   }
 
   audio_quality_analyzer_->Stop();
@@ -729,13 +641,5 @@
   return clock_->CurrentTime();
 }
 
-PeerConnectionE2EQualityTest::ScheduledActivity::ScheduledActivity(
-    TimeDelta initial_delay_since_start,
-    absl::optional<TimeDelta> interval,
-    std::function<void(TimeDelta)> func)
-    : initial_delay_since_start(initial_delay_since_start),
-      interval(interval),
-      func(std::move(func)) {}
-
 }  // namespace webrtc_pc_e2e
 }  // namespace webrtc
diff --git a/test/pc/e2e/peer_connection_quality_test.h b/test/pc/e2e/peer_connection_quality_test.h
index 73e2663..081235b 100644
--- a/test/pc/e2e/peer_connection_quality_test.h
+++ b/test/pc/e2e/peer_connection_quality_test.h
@@ -21,7 +21,6 @@
 #include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
 #include "rtc_base/task_queue_for_test.h"
-#include "rtc_base/task_utils/repeating_task.h"
 #include "rtc_base/thread.h"
 #include "rtc_base/thread_annotations.h"
 #include "system_wrappers/include/clock.h"
@@ -33,6 +32,7 @@
 #include "test/pc/e2e/peer_configurer.h"
 #include "test/pc/e2e/peer_connection_quality_test_params.h"
 #include "test/pc/e2e/sdp/sdp_changer.h"
+#include "test/pc/e2e/test_activities_executor.h"
 #include "test/pc/e2e/test_peer.h"
 
 namespace webrtc {
@@ -79,20 +79,6 @@
   }
 
  private:
-  struct ScheduledActivity {
-    ScheduledActivity(TimeDelta initial_delay_since_start,
-                      absl::optional<TimeDelta> interval,
-                      std::function<void(TimeDelta)> func);
-
-    TimeDelta initial_delay_since_start;
-    absl::optional<TimeDelta> interval;
-    std::function<void(TimeDelta)> func;
-  };
-
-  void ExecuteTask(TimeDelta initial_delay_since_start,
-                   absl::optional<TimeDelta> interval,
-                   std::function<void(TimeDelta)> func);
-  void PostTask(ScheduledActivity activity) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
   // For some functionality some field trials have to be enabled, so we will
   // enable them here.
   void SetupRequiredFieldTrials(const RunParams& run_params);
@@ -120,6 +106,7 @@
   std::unique_ptr<SingleProcessEncodedImageDataInjector>
       encoded_image_id_controller_;
   std::unique_ptr<AudioQualityAnalyzerInterface> audio_quality_analyzer_;
+  std::unique_ptr<TestActivitiesExecutor> executor_;
 
   std::vector<std::unique_ptr<PeerConfigurerImpl>> peer_configurations_;
 
@@ -139,20 +126,7 @@
   AnalyzerHelper analyzer_helper_;
 
   rtc::CriticalSection lock_;
-  // Time when test call was started. Minus infinity means that call wasn't
-  // started yet.
-  Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
   TimeDelta real_test_duration_ RTC_GUARDED_BY(lock_) = TimeDelta::Zero();
-  // Queue of activities that were added before test call was started.
-  // Activities from this queue will be posted on the |task_queue_| after test
-  // call will be set up and then this queue will be unused.
-  std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
-  // List of task handles for activities, that are posted on |task_queue_| as
-  // repeated during the call.
-  std::vector<RepeatingTaskHandle> repeating_task_handles_
-      RTC_GUARDED_BY(lock_);
-
-  RepeatingTaskHandle stats_polling_task_ RTC_GUARDED_BY(&task_queue_);
 
   // Task queue, that is used for running activities during test call.
   // This task queue will be created before call set up and will be destroyed
diff --git a/test/pc/e2e/test_activities_executor.cc b/test/pc/e2e/test_activities_executor.cc
new file mode 100644
index 0000000..4ace6ae
--- /dev/null
+++ b/test/pc/e2e/test_activities_executor.cc
@@ -0,0 +1,124 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/pc/e2e/test_activities_executor.h"
+
+#include <memory>
+#include <utility>
+
+#include "absl/memory/memory.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/location.h"
+#include "rtc_base/logging.h"
+
+namespace webrtc {
+namespace webrtc_pc_e2e {
+
+void TestActivitiesExecutor::Start(TaskQueueForTest* task_queue) {
+  RTC_DCHECK(task_queue);
+  task_queue_ = task_queue;
+  rtc::CritScope crit(&lock_);
+  start_time_ = Now();
+  while (!scheduled_activities_.empty()) {
+    PostActivity(std::move(scheduled_activities_.front()));
+    scheduled_activities_.pop();
+  }
+}
+
+void TestActivitiesExecutor::Stop() {
+  if (task_queue_ == nullptr) {
+    // Already stopped or not started.
+    return;
+  }
+  task_queue_->SendTask(
+      [this]() {
+        rtc::CritScope crit(&lock_);
+        for (auto& handle : repeating_task_handles_) {
+          handle.Stop();
+        }
+      },
+      RTC_FROM_HERE);
+  task_queue_ = nullptr;
+}
+
+void TestActivitiesExecutor::ScheduleActivity(
+    TimeDelta initial_delay_since_start,
+    absl::optional<TimeDelta> interval,
+    std::function<void(TimeDelta)> func) {
+  RTC_CHECK(initial_delay_since_start.IsFinite() &&
+            initial_delay_since_start >= TimeDelta::Zero());
+  RTC_CHECK(!interval ||
+            (interval->IsFinite() && *interval > TimeDelta::Zero()));
+  rtc::CritScope crit(&lock_);
+  ScheduledActivity activity(initial_delay_since_start, interval, func);
+  if (start_time_.IsInfinite()) {
+    scheduled_activities_.push(std::move(activity));
+  } else {
+    PostActivity(std::move(activity));
+  }
+}
+
+void TestActivitiesExecutor::PostActivity(ScheduledActivity activity) {
+  // Because start_time_ will never change at this point copy it to local
+  // variable to capture in in lambda without requirement to hold a lock.
+  Timestamp start_time = start_time_;
+
+  TimeDelta remaining_delay =
+      activity.initial_delay_since_start == TimeDelta::Zero()
+          ? TimeDelta::Zero()
+          : activity.initial_delay_since_start - (Now() - start_time);
+  if (remaining_delay < TimeDelta::Zero()) {
+    RTC_LOG(WARNING) << "Executing late task immediately, late by="
+                     << ToString(remaining_delay.Abs());
+    remaining_delay = TimeDelta::Zero();
+  }
+
+  if (activity.interval) {
+    if (remaining_delay == TimeDelta::Zero()) {
+      repeating_task_handles_.push_back(RepeatingTaskHandle::Start(
+          task_queue_->Get(), [activity, start_time, this]() {
+            activity.func(Now() - start_time);
+            return *activity.interval;
+          }));
+      return;
+    }
+    repeating_task_handles_.push_back(RepeatingTaskHandle::DelayedStart(
+        task_queue_->Get(), remaining_delay, [activity, start_time, this]() {
+          activity.func(Now() - start_time);
+          return *activity.interval;
+        }));
+    return;
+  }
+
+  if (remaining_delay == TimeDelta::Zero()) {
+    task_queue_->PostTask(
+        [activity, start_time, this]() { activity.func(Now() - start_time); });
+    return;
+  }
+
+  task_queue_->PostDelayedTask(
+      [activity, start_time, this]() { activity.func(Now() - start_time); },
+      remaining_delay.ms());
+}
+
+Timestamp TestActivitiesExecutor::Now() const {
+  return clock_->CurrentTime();
+}
+
+TestActivitiesExecutor::ScheduledActivity::ScheduledActivity(
+    TimeDelta initial_delay_since_start,
+    absl::optional<TimeDelta> interval,
+    std::function<void(TimeDelta)> func)
+    : initial_delay_since_start(initial_delay_since_start),
+      interval(interval),
+      func(std::move(func)) {}
+
+}  // namespace webrtc_pc_e2e
+}  // namespace webrtc
diff --git a/test/pc/e2e/test_activities_executor.h b/test/pc/e2e/test_activities_executor.h
new file mode 100644
index 0000000..09bfe41
--- /dev/null
+++ b/test/pc/e2e/test_activities_executor.h
@@ -0,0 +1,83 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
+#define TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_
+
+#include <queue>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "system_wrappers/include/clock.h"
+
+namespace webrtc {
+namespace webrtc_pc_e2e {
+
+class TestActivitiesExecutor {
+ public:
+  explicit TestActivitiesExecutor(Clock* clock) : clock_(clock) {}
+  ~TestActivitiesExecutor() { Stop(); }
+
+  // Starts scheduled activities according to their schedule. All activities
+  // that will be scheduled after Start(...) was invoked will be executed
+  // immediately according to their schedule.
+  void Start(TaskQueueForTest* task_queue);
+  void Stop();
+
+  // Schedule activity to be executed. If test isn't started yet, then activity
+  // will be executed according to its schedule after Start() will be invoked.
+  // If test is started, then it will be executed immediately according to its
+  // schedule.
+  void ScheduleActivity(TimeDelta initial_delay_since_start,
+                        absl::optional<TimeDelta> interval,
+                        std::function<void(TimeDelta)> func);
+
+ private:
+  struct ScheduledActivity {
+    ScheduledActivity(TimeDelta initial_delay_since_start,
+                      absl::optional<TimeDelta> interval,
+                      std::function<void(TimeDelta)> func);
+
+    TimeDelta initial_delay_since_start;
+    absl::optional<TimeDelta> interval;
+    std::function<void(TimeDelta)> func;
+  };
+
+  void PostActivity(ScheduledActivity activity)
+      RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+  Timestamp Now() const;
+
+  Clock* const clock_;
+
+  TaskQueueForTest* task_queue_;
+
+  rtc::CriticalSection lock_;
+  // Time when test was started. Minus infinity means that it wasn't started
+  // yet.
+  Timestamp start_time_ RTC_GUARDED_BY(lock_) = Timestamp::MinusInfinity();
+  // Queue of activities that were added before test was started.
+  // Activities from this queue will be posted on the |task_queue_| after test
+  // will be set up and then this queue will be unused.
+  std::queue<ScheduledActivity> scheduled_activities_ RTC_GUARDED_BY(lock_);
+  // List of task handles for activities, that are posted on |task_queue_| as
+  // repeated during the call.
+  std::vector<RepeatingTaskHandle> repeating_task_handles_
+      RTC_GUARDED_BY(lock_);
+};
+
+}  // namespace webrtc_pc_e2e
+}  // namespace webrtc
+
+#endif  // TEST_PC_E2E_TEST_ACTIVITIES_EXECUTOR_H_