Reland of the test portion of:
https://webrtc-review.googlesource.com/c/src/+/172847

------------ original description --------------

Preparation for ReceiveStatisticsProxy lock reduction.

Update tests to call VideoReceiveStream::GetStats() in the same or at
least similar way it gets called in production (construction thread,
same TQ/thread).

Mapped out threads and context for ReceiveStatisticsProxy,
VideoQualityObserver and VideoReceiveStream. Added
follow-up TODOs for webrtc:11489.

One functional change in ReceiveStatisticsProxy is that when sender
side RtcpPacketTypesCounterUpdated calls are made, the counter is
updated asynchronously since the sender calls the method on a different
thread than the receiver.

Make CallClient::SendTask public to allow tests to run tasks in the
right context. CallClient already does this internally for GetStats.

Remove 10 sec sleep in StopSendingKeyframeRequestsForInactiveStream.

Bug: webrtc:11489
Change-Id: I491e13344b9fa714de0741dd927d907de7e39e83
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173583
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31077}
diff --git a/BUILD.gn b/BUILD.gn
index b3e7710..4e30a71 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -547,6 +547,7 @@
       "rtc_base:weak_ptr_unittests",
       "rtc_base/experiments:experiments_unittests",
       "rtc_base/synchronization:sequence_checker_unittests",
+      "rtc_base/task_utils:pending_task_safety_flag_unittests",
       "rtc_base/task_utils:to_queued_task_unittests",
       "sdk:sdk_tests",
       "test:rtp_test_utils",
diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc
index 2d23087..123be7d 100644
--- a/call/call_perf_tests.cc
+++ b/call/call_perf_tests.cc
@@ -96,21 +96,24 @@
   static const int kMinRunTimeMs = 30000;
 
  public:
-  explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label)
+  explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue,
+                                    Clock* clock,
+                                    const std::string& test_label)
       : test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs),
         clock_(clock),
         test_label_(test_label),
         creation_time_ms_(clock_->TimeInMilliseconds()),
-        first_time_in_sync_(-1),
-        receive_stream_(nullptr) {}
+        task_queue_(task_queue) {}
 
   void OnFrame(const VideoFrame& video_frame) override {
-    VideoReceiveStream::Stats stats;
-    {
-      rtc::CritScope lock(&crit_);
-      if (receive_stream_)
-        stats = receive_stream_->GetStats();
-    }
+    task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
+  }
+
+  void CheckStats() {
+    if (!receive_stream_)
+      return;
+
+    VideoReceiveStream::Stats stats = receive_stream_->GetStats();
     if (stats.sync_offset_ms == std::numeric_limits<int>::max())
       return;
 
@@ -135,7 +138,8 @@
   }
 
   void set_receive_stream(VideoReceiveStream* receive_stream) {
-    rtc::CritScope lock(&crit_);
+    RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current());
+    // Note that receive_stream may be nullptr.
     receive_stream_ = receive_stream;
   }
 
@@ -148,10 +152,10 @@
   Clock* const clock_;
   std::string test_label_;
   const int64_t creation_time_ms_;
-  int64_t first_time_in_sync_;
-  rtc::CriticalSection crit_;
-  VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_);
+  int64_t first_time_in_sync_ = -1;
+  VideoReceiveStream* receive_stream_ = nullptr;
   std::vector<double> sync_offset_ms_list_;
+  TaskQueueBase* const task_queue_;
 };
 
 void CallPerfTest::TestAudioVideoSync(FecMode fec,
@@ -168,7 +172,8 @@
   audio_net_config.queue_delay_ms = 500;
   audio_net_config.loss_percent = 5;
 
-  VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label);
+  auto observer = std::make_unique<VideoRtcpAndSyncObserver>(
+      task_queue(), Clock::GetRealTimeClock(), test_label);
 
   std::map<uint8_t, MediaType> audio_pt_map;
   std::map<uint8_t, MediaType> video_pt_map;
@@ -218,7 +223,7 @@
                  });
 
     audio_send_transport = std::make_unique<test::PacketTransport>(
-        task_queue(), sender_call_.get(), &observer,
+        task_queue(), sender_call_.get(), observer.get(),
         test::PacketTransport::kSender, audio_pt_map,
         std::make_unique<FakeNetworkPipe>(
             Clock::GetRealTimeClock(),
@@ -226,7 +231,7 @@
     audio_send_transport->SetReceiver(receiver_call_->Receiver());
 
     video_send_transport = std::make_unique<test::PacketTransport>(
-        task_queue(), sender_call_.get(), &observer,
+        task_queue(), sender_call_.get(), observer.get(),
         test::PacketTransport::kSender, video_pt_map,
         std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
                                           std::make_unique<SimulatedNetwork>(
@@ -234,7 +239,7 @@
     video_send_transport->SetReceiver(receiver_call_->Receiver());
 
     receive_transport = std::make_unique<test::PacketTransport>(
-        task_queue(), receiver_call_.get(), &observer,
+        task_queue(), receiver_call_.get(), observer.get(),
         test::PacketTransport::kReceiver, payload_type_map_,
         std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
                                           std::make_unique<SimulatedNetwork>(
@@ -259,7 +264,7 @@
       video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType;
     }
     video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
-    video_receive_configs_[0].renderer = &observer;
+    video_receive_configs_[0].renderer = observer.get();
     video_receive_configs_[0].sync_group = kSyncGroup;
 
     AudioReceiveStream::Config audio_recv_config;
@@ -281,7 +286,7 @@
           receiver_call_->CreateAudioReceiveStream(audio_recv_config);
     }
     EXPECT_EQ(1u, video_receive_streams_.size());
-    observer.set_receive_stream(video_receive_streams_[0]);
+    observer->set_receive_stream(video_receive_streams_[0]);
     drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed);
     CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
                                           kDefaultFramerate, kDefaultWidth,
@@ -293,10 +298,13 @@
     audio_receive_stream->Start();
   });
 
-  EXPECT_TRUE(observer.Wait())
+  EXPECT_TRUE(observer->Wait())
       << "Timed out while waiting for audio and video to be synchronized.";
 
   SendTask(RTC_FROM_HERE, task_queue(), [&]() {
+    // Clear the pointer to the receive stream since it will now be deleted.
+    observer->set_receive_stream(nullptr);
+
     audio_send_stream->Stop();
     audio_receive_stream->Stop();
 
@@ -314,7 +322,7 @@
     DestroyCalls();
   });
 
-  observer.PrintResults();
+  observer->PrintResults();
 
   // In quick test synchronization may not be achieved in time.
   if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) {
@@ -323,6 +331,9 @@
     EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs"));
 #endif
   }
+
+  task_queue()->PostTask(
+      ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
 }
 
 TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) {
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index 951cd4e..d7d7034 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -526,9 +526,9 @@
   test::NetworkSimulationConfig net_conf;
   net_conf.bandwidth = DataRate::KilobitsPerSec(300);
   auto send_node = s.CreateSimulationNode(net_conf);
+  auto* callee = s.CreateClient("return", call_conf);
   auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node},
-                               s.CreateClient("return", call_conf),
-                               {s.CreateSimulationNode(net_conf)});
+                               callee, {s.CreateSimulationNode(net_conf)});
 
   test::VideoStreamConfig lossy_config;
   lossy_config.source.framerate = 5;
@@ -556,14 +556,20 @@
   // from initial probing.
   s.RunFor(TimeDelta::Seconds(1));
   rtx_packets = 0;
-  int decoded_baseline = lossy->receive()->GetStats().frames_decoded;
+  int decoded_baseline = 0;
+  callee->SendTask([&decoded_baseline, &lossy]() {
+    decoded_baseline = lossy->receive()->GetStats().frames_decoded;
+  });
   s.RunFor(TimeDelta::Seconds(1));
   // We expect both that RTX packets were sent and that an appropriate number of
   // frames were received. This is somewhat redundant but reduces the risk of
   // false positives in future regressions (e.g. RTX is send due to probing).
   EXPECT_GE(rtx_packets, 1);
-  int frames_decoded =
-      lossy->receive()->GetStats().frames_decoded - decoded_baseline;
+  int frames_decoded = 0;
+  callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() {
+    frames_decoded =
+        lossy->receive()->GetStats().frames_decoded - decoded_baseline;
+  });
   EXPECT_EQ(frames_decoded, 5);
 }
 
diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
index 1083214..361da92 100644
--- a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
+++ b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
@@ -537,8 +537,8 @@
   auto ret_net = {s.CreateSimulationNode(net_conf)};
 
   auto* client = s.CreateClient("send", CallClientConfig());
-  auto* route = s.CreateRoutes(
-      client, send_net, s.CreateClient("return", CallClientConfig()), ret_net);
+  auto* callee = s.CreateClient("return", CallClientConfig());
+  auto* route = s.CreateRoutes(client, send_net, callee, ret_net);
   // TODO(srte): Make this work with RTX enabled or remove it.
   auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) {
     c->stream.use_rtx = false;
@@ -553,9 +553,17 @@
     s.net()->StopCrossTraffic(tcp_traffic);
     s.RunFor(TimeDelta::Seconds(20));
   }
-  return DataSize::Bytes(video->receive()
-                             ->GetStats()
-                             .rtp_stats.packet_counter.TotalBytes()) /
+
+  // Querying the video stats from within the expected runtime environment
+  // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
+  // we're currently on).
+  VideoReceiveStream::Stats video_receive_stats;
+  auto* video_stream = video->receive();
+  callee->SendTask([&video_stream, &video_receive_stats]() {
+    video_receive_stats = video_stream->GetStats();
+  });
+  return DataSize::Bytes(
+             video_receive_stats.rtp_stats.packet_counter.TotalBytes()) /
          s.TimeSinceStart();
 }
 
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 2e7d53c..8409aa2 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -26,12 +26,39 @@
   ]
 }
 
+rtc_library("pending_task_safety_flag") {
+  sources = [
+    "pending_task_safety_flag.cc",
+    "pending_task_safety_flag.h",
+  ]
+  deps = [
+    "..:checks",
+    "..:refcount",
+    "..:thread_checker",
+    "../../api:scoped_refptr",
+    "../synchronization:sequence_checker",
+  ]
+}
+
 rtc_source_set("to_queued_task") {
   sources = [ "to_queued_task.h" ]
   deps = [ "../../api/task_queue" ]
 }
 
 if (rtc_include_tests) {
+  rtc_library("pending_task_safety_flag_unittests") {
+    testonly = true
+    sources = [ "pending_task_safety_flag_unittest.cc" ]
+    deps = [
+      ":pending_task_safety_flag",
+      ":to_queued_task",
+      "..:rtc_base_approved",
+      "..:rtc_task_queue",
+      "..:task_queue_for_test",
+      "../../test:test_support",
+    ]
+  }
+
   rtc_library("repeating_task_unittests") {
     testonly = true
     sources = [ "repeating_task_unittest.cc" ]
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
new file mode 100644
index 0000000..307d2d5
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag.cc
@@ -0,0 +1,32 @@
+/*
+ *  Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+
+#include "rtc_base/ref_counted_object.h"
+
+namespace webrtc {
+
+// static
+PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
+  return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
+}
+
+void PendingTaskSafetyFlag::SetNotAlive() {
+  RTC_DCHECK_RUN_ON(&main_sequence_);
+  alive_ = false;
+}
+
+bool PendingTaskSafetyFlag::alive() const {
+  RTC_DCHECK_RUN_ON(&main_sequence_);
+  return alive_;
+}
+
+}  // namespace webrtc
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
new file mode 100644
index 0000000..1b301c8
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag.h
@@ -0,0 +1,61 @@
+/*
+ *  Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
+#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
+
+#include "api/scoped_refptr.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/ref_count.h"
+#include "rtc_base/synchronization/sequence_checker.h"
+
+namespace webrtc {
+
+// Use this flag to drop pending tasks that have been posted to the "main"
+// thread/TQ and end up running after the owning instance has been
+// deleted. The owning instance signals deletion by calling SetNotAlive() from
+// its destructor.
+//
+// When posting a task, post a copy (capture by-value in a lambda) of the flag
+// instance and before performing the work, check the |alive()| state. Abort if
+// alive() returns |false|:
+//
+//    // Running outside of the main thread.
+//    my_task_queue_->PostTask(ToQueuedTask(
+//        [safety = pending_task_safety_flag_, this]() {
+//          // Now running on the main thread.
+//          if (!safety->alive())
+//            return;
+//          MyMethod();
+//        }));
+//
+// Note that checking the state only works on the construction/destruction
+// thread of the ReceiveStatisticsProxy instance.
+class PendingTaskSafetyFlag : public rtc::RefCountInterface {
+ public:
+  using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
+  static Pointer Create();
+
+  ~PendingTaskSafetyFlag() = default;
+
+  void SetNotAlive();
+  bool alive() const;
+
+ protected:
+  PendingTaskSafetyFlag() = default;
+
+ private:
+  bool alive_ = true;
+  SequenceChecker main_sequence_;
+};
+
+}  // namespace webrtc
+
+#endif  // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
new file mode 100644
index 0000000..0c1c3c8
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
@@ -0,0 +1,151 @@
+/*
+ *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+
+#include <memory>
+
+#include "rtc_base/event.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+using ::testing::AtLeast;
+using ::testing::Invoke;
+using ::testing::MockFunction;
+using ::testing::NiceMock;
+using ::testing::Return;
+}  // namespace
+
+TEST(PendingTaskSafetyFlagTest, Basic) {
+  PendingTaskSafetyFlag::Pointer safety_flag;
+  {
+    // Scope for the |owner| instance.
+    class Owner {
+     public:
+      Owner() = default;
+      ~Owner() { flag_->SetNotAlive(); }
+
+      PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+    } owner;
+    EXPECT_TRUE(owner.flag_->alive());
+    safety_flag = owner.flag_;
+    EXPECT_TRUE(safety_flag->alive());
+  }
+  EXPECT_FALSE(safety_flag->alive());
+}
+
+TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
+  TaskQueueForTest tq1("OwnerHere");
+  TaskQueueForTest tq2("OwnerNotHere");
+
+  class Owner {
+   public:
+    Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); }
+    ~Owner() {
+      RTC_DCHECK(tq_main_->IsCurrent());
+      flag_->SetNotAlive();
+    }
+
+    void DoStuff() {
+      RTC_DCHECK(!tq_main_->IsCurrent());
+      tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
+        if (!safe->alive())
+          return;
+        stuff_done_ = true;
+      }));
+    }
+
+    bool stuff_done() const { return stuff_done_; }
+
+   private:
+    TaskQueueBase* const tq_main_;
+    bool stuff_done_ = false;
+    PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+  };
+
+  std::unique_ptr<Owner> owner;
+  tq1.SendTask(
+      [&owner]() {
+        owner.reset(new Owner());
+        EXPECT_FALSE(owner->stuff_done());
+      },
+      RTC_FROM_HERE);
+  ASSERT_TRUE(owner);
+  tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
+  tq1.SendTask(
+      [&owner]() {
+        EXPECT_TRUE(owner->stuff_done());
+        owner.reset();
+      },
+      RTC_FROM_HERE);
+  ASSERT_FALSE(owner);
+}
+
+TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
+  TaskQueueForTest tq1("OwnerHere");
+  TaskQueueForTest tq2("OwnerNotHere");
+
+  class Owner {
+   public:
+    explicit Owner(bool* stuff_done)
+        : tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) {
+      RTC_DCHECK(tq_main_);
+      *stuff_done_ = false;
+    }
+    ~Owner() {
+      RTC_DCHECK(tq_main_->IsCurrent());
+      flag_->SetNotAlive();
+    }
+
+    void DoStuff() {
+      RTC_DCHECK(!tq_main_->IsCurrent());
+      tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
+        if (!safe->alive())
+          return;
+        *stuff_done_ = true;
+      }));
+    }
+
+   private:
+    TaskQueueBase* const tq_main_;
+    bool* const stuff_done_;
+    PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+  };
+
+  std::unique_ptr<Owner> owner;
+  bool stuff_done = false;
+  tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); },
+               RTC_FROM_HERE);
+  ASSERT_TRUE(owner);
+  // Queue up a task on tq1 that will execute before the 'DoStuff' task
+  // can, and delete the |owner| before the 'stuff' task can execute.
+  rtc::Event blocker;
+  tq1.PostTask([&blocker, &owner]() {
+    blocker.Wait(rtc::Event::kForever);
+    owner.reset();
+  });
+
+  // Queue up a DoStuff...
+  tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
+
+  ASSERT_TRUE(owner);
+  blocker.Set();
+
+  // Run an empty task on tq1 to flush all the queued tasks.
+  tq1.SendTask([]() {}, RTC_FROM_HERE);
+  ASSERT_FALSE(owner);
+  EXPECT_FALSE(stuff_done);
+}
+}  // namespace webrtc
diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h
index 803b4a8..33fa276 100644
--- a/test/scenario/call_client.h
+++ b/test/scenario/call_client.h
@@ -113,6 +113,11 @@
   void OnPacketReceived(EmulatedIpPacket packet) override;
   std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name);
 
+  // Exposed publicly so that tests can execute tasks such as querying stats
+  // for media streams in the expected runtime environment (essentially what
+  // CallClient does internally for GetStats()).
+  void SendTask(std::function<void()> task);
+
  private:
   friend class Scenario;
   friend class CallClientPair;
@@ -129,7 +134,6 @@
   uint32_t GetNextAudioLocalSsrc();
   uint32_t GetNextRtxSsrc();
   void AddExtensions(std::vector<RtpExtension> extensions);
-  void SendTask(std::function<void()> task);
   int16_t Bind(EmulatedEndpoint* endpoint);
   void UnBind();
 
diff --git a/test/scenario/stats_collection_unittest.cc b/test/scenario/stats_collection_unittest.cc
index fae3365..af3b982 100644
--- a/test/scenario/stats_collection_unittest.cc
+++ b/test/scenario/stats_collection_unittest.cc
@@ -25,17 +25,26 @@
       VideoStreamConfig::Encoder::Implementation::kSoftware;
   config.hooks.frame_pair_handlers = {analyzer->Handler()};
   auto* caller = s->CreateClient("caller", CallClientConfig());
+  auto* callee = s->CreateClient("callee", CallClientConfig());
   auto route =
-      s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)},
-                      s->CreateClient("callee", CallClientConfig()),
+      s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee,
                       {s->CreateSimulationNode(NetworkSimulationConfig())});
-  auto* video = s->CreateVideoStream(route->forward(), config);
+  VideoStreamPair* video = s->CreateVideoStream(route->forward(), config);
   auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig());
   s->Every(TimeDelta::Seconds(1), [=] {
     collectors->call.AddStats(caller->GetStats());
-    collectors->audio_receive.AddStats(audio->receive()->GetStats());
     collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
-    collectors->video_receive.AddStats(video->receive()->GetStats());
+    collectors->audio_receive.AddStats(audio->receive()->GetStats());
+
+    // Querying the video stats from within the expected runtime environment
+    // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
+    // we're currently on).
+    VideoReceiveStream::Stats video_receive_stats;
+    auto* video_stream = video->receive();
+    callee->SendTask([&video_stream, &video_receive_stats]() {
+      video_receive_stats = video_stream->GetStats();
+    });
+    collectors->video_receive.AddStats(video_receive_stats);
   });
 }
 }  // namespace
diff --git a/video/BUILD.gn b/video/BUILD.gn
index 14109c3..9d26ee2 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -115,6 +115,7 @@
     "../rtc_base/experiments:rate_control_settings",
     "../rtc_base/synchronization:sequence_checker",
     "../rtc_base/system:thread_registry",
+    "../rtc_base/task_utils:pending_task_safety_flag",
     "../rtc_base/task_utils:repeating_task",
     "../rtc_base/task_utils:to_queued_task",
     "../rtc_base/time:timestamp_extrapolator",
diff --git a/video/end_to_end_tests/retransmission_tests.cc b/video/end_to_end_tests/retransmission_tests.cc
index 407aa5f..c28b129 100644
--- a/video/end_to_end_tests/retransmission_tests.cc
+++ b/video/end_to_end_tests/retransmission_tests.cc
@@ -18,8 +18,8 @@
 #include "call/simulated_network.h"
 #include "modules/rtp_rtcp/source/rtp_packet.h"
 #include "modules/video_coding/codecs/vp8/include/vp8.h"
+#include "rtc_base/event.h"
 #include "rtc_base/task_queue_for_test.h"
-#include "system_wrappers/include/sleep.h"
 #include "test/call_test.h"
 #include "test/field_trial.h"
 #include "test/gtest.h"
@@ -203,7 +203,7 @@
 
 TEST_F(RetransmissionEndToEndTest,
        StopSendingKeyframeRequestsForInactiveStream) {
-  class KeyframeRequestObserver : public test::EndToEndTest {
+  class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
    public:
     explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
         : clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@@ -216,28 +216,59 @@
       receive_stream_ = receive_streams[0];
     }
 
-    void PerformTest() override {
-      bool frame_decoded = false;
-      int64_t start_time = clock_->TimeInMilliseconds();
-      while (clock_->TimeInMilliseconds() - start_time <= 5000) {
-        if (receive_stream_->GetStats().frames_decoded > 0) {
-          frame_decoded = true;
-          break;
-        }
-        SleepMs(100);
+    Action OnReceiveRtcp(const uint8_t* packet, size_t length) override {
+      test::RtcpPacketParser parser;
+      EXPECT_TRUE(parser.Parse(packet, length));
+      if (parser.pli()->num_packets() > 0)
+        task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+      return SEND_PACKET;
+    }
+
+    bool PollStats() {
+      if (receive_stream_->GetStats().frames_decoded > 0) {
+        frame_decoded_ = true;
+      } else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
+        task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
+        return false;
       }
-      ASSERT_TRUE(frame_decoded);
-      SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); });
-      SleepMs(10000);
-      ASSERT_EQ(
-          1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
+      return true;
+    }
+
+    void PerformTest() override {
+      start_time_ = clock_->TimeInMilliseconds();
+      task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+      test_done_.Wait(rtc::Event::kForever);
+    }
+
+    bool Run() override {
+      if (!frame_decoded_) {
+        if (PollStats()) {
+          send_stream_->Stop();
+          if (!frame_decoded_) {
+            test_done_.Set();
+          } else {
+            // Now we wait for the PLI packet. Once we receive it, a task
+            // will be posted (see OnReceiveRtcp) and we'll check the stats
+            // once more before signaling that we're done.
+          }
+        }
+      } else {
+        EXPECT_EQ(
+            1U,
+            receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
+        test_done_.Set();
+      }
+      return false;
     }
 
    private:
-    Clock* clock_;
+    Clock* const clock_;
     VideoSendStream* send_stream_;
     VideoReceiveStream* receive_stream_;
     TaskQueueBase* const task_queue_;
+    rtc::Event test_done_;
+    bool frame_decoded_ = false;
+    int64_t start_time_ = 0;
   } test(task_queue());
 
   RunBaseTest(&test);
diff --git a/video/end_to_end_tests/stats_tests.cc b/video/end_to_end_tests/stats_tests.cc
index b43f79d..32bcedb 100644
--- a/video/end_to_end_tests/stats_tests.cc
+++ b/video/end_to_end_tests/stats_tests.cc
@@ -297,6 +297,7 @@
         const std::vector<VideoReceiveStream*>& receive_streams) override {
       send_stream_ = send_stream;
       receive_streams_ = receive_streams;
+      task_queue_ = TaskQueueBase::Current();
     }
 
     void PerformTest() override {
@@ -307,8 +308,10 @@
       bool send_ok = false;
 
       while (now_ms < stop_time_ms) {
-        if (!receive_ok)
-          receive_ok = CheckReceiveStats();
+        if (!receive_ok && task_queue_) {
+          SendTask(RTC_FROM_HERE, task_queue_,
+                   [&]() { receive_ok = CheckReceiveStats(); });
+        }
         if (!send_ok)
           send_ok = CheckSendStats();
 
@@ -346,6 +349,7 @@
 
     rtc::Event check_stats_event_;
     ReceiveStreamRenderer receive_stream_renderer_;
+    TaskQueueBase* task_queue_ = nullptr;
   } test;
 
   RunBaseTest(&test);
@@ -377,22 +381,28 @@
         VideoSendStream* send_stream,
         const std::vector<VideoReceiveStream*>& receive_streams) override {
       receive_streams_ = receive_streams;
+      task_queue_ = TaskQueueBase::Current();
     }
 
     void PerformTest() override {
       // No frames reported initially.
-      for (const auto& receive_stream : receive_streams_) {
-        EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
-      }
+      SendTask(RTC_FROM_HERE, task_queue_, [&]() {
+        for (const auto& receive_stream : receive_streams_) {
+          EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
+        }
+      });
       // Wait for at least one timing frame to be sent with 100ms grace period.
       SleepMs(kDefaultTimingFramesDelayMs + 100);
       // Check that timing frames are reported for each stream.
-      for (const auto& receive_stream : receive_streams_) {
-        EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
-      }
+      SendTask(RTC_FROM_HERE, task_queue_, [&]() {
+        for (const auto& receive_stream : receive_streams_) {
+          EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
+        }
+      });
     }
 
     std::vector<VideoReceiveStream*> receive_streams_;
+    TaskQueueBase* task_queue_ = nullptr;
   } test;
 
   RunBaseTest(&test);
@@ -400,7 +410,8 @@
 
 TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
   static const size_t kNumRtpPacketsToSend = 5;
-  class ReceivedRtpStatsObserver : public test::EndToEndTest {
+  class ReceivedRtpStatsObserver : public test::EndToEndTest,
+                                   public QueuedTask {
    public:
     ReceivedRtpStatsObserver()
         : EndToEndTest(kDefaultTimeoutMs),
@@ -412,14 +423,14 @@
         VideoSendStream* send_stream,
         const std::vector<VideoReceiveStream*>& receive_streams) override {
       receive_stream_ = receive_streams[0];
+      task_queue_ = TaskQueueBase::Current();
+      EXPECT_TRUE(task_queue_ != nullptr);
     }
 
     Action OnSendRtp(const uint8_t* packet, size_t length) override {
       if (sent_rtp_ >= kNumRtpPacketsToSend) {
-        VideoReceiveStream::Stats stats = receive_stream_->GetStats();
-        if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
-          observation_complete_.Set();
-        }
+        // Need to check the stats on the correct thread.
+        task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
         return DROP_PACKET;
       }
       ++sent_rtp_;
@@ -431,8 +442,17 @@
           << "Timed out while verifying number of received RTP packets.";
     }
 
+    bool Run() override {
+      VideoReceiveStream::Stats stats = receive_stream_->GetStats();
+      if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
+        observation_complete_.Set();
+      }
+      return false;
+    }
+
     VideoReceiveStream* receive_stream_;
     uint32_t sent_rtp_;
+    TaskQueueBase* task_queue_ = nullptr;
   } test;
 
   RunBaseTest(&test);
@@ -578,7 +598,7 @@
 
 TEST_F(StatsEndToEndTest, VerifyNackStats) {
   static const int kPacketNumberToDrop = 200;
-  class NackObserver : public test::EndToEndTest {
+  class NackObserver : public test::EndToEndTest, public QueuedTask {
    public:
     NackObserver()
         : EndToEndTest(kLongTimeoutMs),
@@ -598,7 +618,7 @@
         dropped_rtp_packet_ = header.sequenceNumber;
         return DROP_PACKET;
       }
-      VerifyStats();
+      task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
       return SEND_PACKET;
     }
 
@@ -659,6 +679,14 @@
         const std::vector<VideoReceiveStream*>& receive_streams) override {
       send_stream_ = send_stream;
       receive_streams_ = receive_streams;
+      task_queue_ = TaskQueueBase::Current();
+      EXPECT_TRUE(task_queue_ != nullptr);
+    }
+
+    bool Run() override {
+      rtc::CritScope lock(&crit_);
+      VerifyStats();
+      return false;
     }
 
     void PerformTest() override {
@@ -673,6 +701,7 @@
     std::vector<VideoReceiveStream*> receive_streams_;
     VideoSendStream* send_stream_;
     absl::optional<int64_t> start_runtime_ms_;
+    TaskQueueBase* task_queue_ = nullptr;
   } test;
 
   metrics::Reset();