Reland of the test portion of:
https://webrtc-review.googlesource.com/c/src/+/172847
------------ original description --------------
Preparation for ReceiveStatisticsProxy lock reduction.
Update tests to call VideoReceiveStream::GetStats() in the same or at
least similar way it gets called in production (construction thread,
same TQ/thread).
Mapped out threads and context for ReceiveStatisticsProxy,
VideoQualityObserver and VideoReceiveStream. Added
follow-up TODOs for webrtc:11489.
One functional change in ReceiveStatisticsProxy is that when sender
side RtcpPacketTypesCounterUpdated calls are made, the counter is
updated asynchronously since the sender calls the method on a different
thread than the receiver.
Make CallClient::SendTask public to allow tests to run tasks in the
right context. CallClient already does this internally for GetStats.
Remove 10 sec sleep in StopSendingKeyframeRequestsForInactiveStream.
Bug: webrtc:11489
Change-Id: I491e13344b9fa714de0741dd927d907de7e39e83
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173583
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31077}
diff --git a/BUILD.gn b/BUILD.gn
index b3e7710..4e30a71 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -547,6 +547,7 @@
"rtc_base:weak_ptr_unittests",
"rtc_base/experiments:experiments_unittests",
"rtc_base/synchronization:sequence_checker_unittests",
+ "rtc_base/task_utils:pending_task_safety_flag_unittests",
"rtc_base/task_utils:to_queued_task_unittests",
"sdk:sdk_tests",
"test:rtp_test_utils",
diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc
index 2d23087..123be7d 100644
--- a/call/call_perf_tests.cc
+++ b/call/call_perf_tests.cc
@@ -96,21 +96,24 @@
static const int kMinRunTimeMs = 30000;
public:
- explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label)
+ explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue,
+ Clock* clock,
+ const std::string& test_label)
: test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs),
clock_(clock),
test_label_(test_label),
creation_time_ms_(clock_->TimeInMilliseconds()),
- first_time_in_sync_(-1),
- receive_stream_(nullptr) {}
+ task_queue_(task_queue) {}
void OnFrame(const VideoFrame& video_frame) override {
- VideoReceiveStream::Stats stats;
- {
- rtc::CritScope lock(&crit_);
- if (receive_stream_)
- stats = receive_stream_->GetStats();
- }
+ task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
+ }
+
+ void CheckStats() {
+ if (!receive_stream_)
+ return;
+
+ VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (stats.sync_offset_ms == std::numeric_limits<int>::max())
return;
@@ -135,7 +138,8 @@
}
void set_receive_stream(VideoReceiveStream* receive_stream) {
- rtc::CritScope lock(&crit_);
+ RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current());
+ // Note that receive_stream may be nullptr.
receive_stream_ = receive_stream;
}
@@ -148,10 +152,10 @@
Clock* const clock_;
std::string test_label_;
const int64_t creation_time_ms_;
- int64_t first_time_in_sync_;
- rtc::CriticalSection crit_;
- VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_);
+ int64_t first_time_in_sync_ = -1;
+ VideoReceiveStream* receive_stream_ = nullptr;
std::vector<double> sync_offset_ms_list_;
+ TaskQueueBase* const task_queue_;
};
void CallPerfTest::TestAudioVideoSync(FecMode fec,
@@ -168,7 +172,8 @@
audio_net_config.queue_delay_ms = 500;
audio_net_config.loss_percent = 5;
- VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label);
+ auto observer = std::make_unique<VideoRtcpAndSyncObserver>(
+ task_queue(), Clock::GetRealTimeClock(), test_label);
std::map<uint8_t, MediaType> audio_pt_map;
std::map<uint8_t, MediaType> video_pt_map;
@@ -218,7 +223,7 @@
});
audio_send_transport = std::make_unique<test::PacketTransport>(
- task_queue(), sender_call_.get(), &observer,
+ task_queue(), sender_call_.get(), observer.get(),
test::PacketTransport::kSender, audio_pt_map,
std::make_unique<FakeNetworkPipe>(
Clock::GetRealTimeClock(),
@@ -226,7 +231,7 @@
audio_send_transport->SetReceiver(receiver_call_->Receiver());
video_send_transport = std::make_unique<test::PacketTransport>(
- task_queue(), sender_call_.get(), &observer,
+ task_queue(), sender_call_.get(), observer.get(),
test::PacketTransport::kSender, video_pt_map,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@@ -234,7 +239,7 @@
video_send_transport->SetReceiver(receiver_call_->Receiver());
receive_transport = std::make_unique<test::PacketTransport>(
- task_queue(), receiver_call_.get(), &observer,
+ task_queue(), receiver_call_.get(), observer.get(),
test::PacketTransport::kReceiver, payload_type_map_,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@@ -259,7 +264,7 @@
video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType;
}
video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
- video_receive_configs_[0].renderer = &observer;
+ video_receive_configs_[0].renderer = observer.get();
video_receive_configs_[0].sync_group = kSyncGroup;
AudioReceiveStream::Config audio_recv_config;
@@ -281,7 +286,7 @@
receiver_call_->CreateAudioReceiveStream(audio_recv_config);
}
EXPECT_EQ(1u, video_receive_streams_.size());
- observer.set_receive_stream(video_receive_streams_[0]);
+ observer->set_receive_stream(video_receive_streams_[0]);
drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed);
CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
kDefaultFramerate, kDefaultWidth,
@@ -293,10 +298,13 @@
audio_receive_stream->Start();
});
- EXPECT_TRUE(observer.Wait())
+ EXPECT_TRUE(observer->Wait())
<< "Timed out while waiting for audio and video to be synchronized.";
SendTask(RTC_FROM_HERE, task_queue(), [&]() {
+ // Clear the pointer to the receive stream since it will now be deleted.
+ observer->set_receive_stream(nullptr);
+
audio_send_stream->Stop();
audio_receive_stream->Stop();
@@ -314,7 +322,7 @@
DestroyCalls();
});
- observer.PrintResults();
+ observer->PrintResults();
// In quick test synchronization may not be achieved in time.
if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) {
@@ -323,6 +331,9 @@
EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs"));
#endif
}
+
+ task_queue()->PostTask(
+ ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
}
TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) {
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index 951cd4e..d7d7034 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -526,9 +526,9 @@
test::NetworkSimulationConfig net_conf;
net_conf.bandwidth = DataRate::KilobitsPerSec(300);
auto send_node = s.CreateSimulationNode(net_conf);
+ auto* callee = s.CreateClient("return", call_conf);
auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node},
- s.CreateClient("return", call_conf),
- {s.CreateSimulationNode(net_conf)});
+ callee, {s.CreateSimulationNode(net_conf)});
test::VideoStreamConfig lossy_config;
lossy_config.source.framerate = 5;
@@ -556,14 +556,20 @@
// from initial probing.
s.RunFor(TimeDelta::Seconds(1));
rtx_packets = 0;
- int decoded_baseline = lossy->receive()->GetStats().frames_decoded;
+ int decoded_baseline = 0;
+ callee->SendTask([&decoded_baseline, &lossy]() {
+ decoded_baseline = lossy->receive()->GetStats().frames_decoded;
+ });
s.RunFor(TimeDelta::Seconds(1));
// We expect both that RTX packets were sent and that an appropriate number of
// frames were received. This is somewhat redundant but reduces the risk of
// false positives in future regressions (e.g. RTX is send due to probing).
EXPECT_GE(rtx_packets, 1);
- int frames_decoded =
- lossy->receive()->GetStats().frames_decoded - decoded_baseline;
+ int frames_decoded = 0;
+ callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() {
+ frames_decoded =
+ lossy->receive()->GetStats().frames_decoded - decoded_baseline;
+ });
EXPECT_EQ(frames_decoded, 5);
}
diff --git a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
index 1083214..361da92 100644
--- a/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
+++ b/modules/congestion_controller/goog_cc/goog_cc_network_control_unittest.cc
@@ -537,8 +537,8 @@
auto ret_net = {s.CreateSimulationNode(net_conf)};
auto* client = s.CreateClient("send", CallClientConfig());
- auto* route = s.CreateRoutes(
- client, send_net, s.CreateClient("return", CallClientConfig()), ret_net);
+ auto* callee = s.CreateClient("return", CallClientConfig());
+ auto* route = s.CreateRoutes(client, send_net, callee, ret_net);
// TODO(srte): Make this work with RTX enabled or remove it.
auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) {
c->stream.use_rtx = false;
@@ -553,9 +553,17 @@
s.net()->StopCrossTraffic(tcp_traffic);
s.RunFor(TimeDelta::Seconds(20));
}
- return DataSize::Bytes(video->receive()
- ->GetStats()
- .rtp_stats.packet_counter.TotalBytes()) /
+
+ // Querying the video stats from within the expected runtime environment
+ // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
+ // we're currently on).
+ VideoReceiveStream::Stats video_receive_stats;
+ auto* video_stream = video->receive();
+ callee->SendTask([&video_stream, &video_receive_stats]() {
+ video_receive_stats = video_stream->GetStats();
+ });
+ return DataSize::Bytes(
+ video_receive_stats.rtp_stats.packet_counter.TotalBytes()) /
s.TimeSinceStart();
}
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 2e7d53c..8409aa2 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -26,12 +26,39 @@
]
}
+rtc_library("pending_task_safety_flag") {
+ sources = [
+ "pending_task_safety_flag.cc",
+ "pending_task_safety_flag.h",
+ ]
+ deps = [
+ "..:checks",
+ "..:refcount",
+ "..:thread_checker",
+ "../../api:scoped_refptr",
+ "../synchronization:sequence_checker",
+ ]
+}
+
rtc_source_set("to_queued_task") {
sources = [ "to_queued_task.h" ]
deps = [ "../../api/task_queue" ]
}
if (rtc_include_tests) {
+ rtc_library("pending_task_safety_flag_unittests") {
+ testonly = true
+ sources = [ "pending_task_safety_flag_unittest.cc" ]
+ deps = [
+ ":pending_task_safety_flag",
+ ":to_queued_task",
+ "..:rtc_base_approved",
+ "..:rtc_task_queue",
+ "..:task_queue_for_test",
+ "../../test:test_support",
+ ]
+ }
+
rtc_library("repeating_task_unittests") {
testonly = true
sources = [ "repeating_task_unittest.cc" ]
diff --git a/rtc_base/task_utils/pending_task_safety_flag.cc b/rtc_base/task_utils/pending_task_safety_flag.cc
new file mode 100644
index 0000000..307d2d5
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag.cc
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+
+#include "rtc_base/ref_counted_object.h"
+
+namespace webrtc {
+
+// static
+PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
+ return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
+}
+
+void PendingTaskSafetyFlag::SetNotAlive() {
+ RTC_DCHECK_RUN_ON(&main_sequence_);
+ alive_ = false;
+}
+
+bool PendingTaskSafetyFlag::alive() const {
+ RTC_DCHECK_RUN_ON(&main_sequence_);
+ return alive_;
+}
+
+} // namespace webrtc
diff --git a/rtc_base/task_utils/pending_task_safety_flag.h b/rtc_base/task_utils/pending_task_safety_flag.h
new file mode 100644
index 0000000..1b301c8
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2020 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
+#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
+
+#include "api/scoped_refptr.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/ref_count.h"
+#include "rtc_base/synchronization/sequence_checker.h"
+
+namespace webrtc {
+
+// Use this flag to drop pending tasks that have been posted to the "main"
+// thread/TQ and end up running after the owning instance has been
+// deleted. The owning instance signals deletion by calling SetNotAlive() from
+// its destructor.
+//
+// When posting a task, post a copy (capture by-value in a lambda) of the flag
+// instance and before performing the work, check the |alive()| state. Abort if
+// alive() returns |false|:
+//
+// // Running outside of the main thread.
+// my_task_queue_->PostTask(ToQueuedTask(
+// [safety = pending_task_safety_flag_, this]() {
+// // Now running on the main thread.
+// if (!safety->alive())
+// return;
+// MyMethod();
+// }));
+//
+// Note that checking the state only works on the construction/destruction
+// thread of the ReceiveStatisticsProxy instance.
+class PendingTaskSafetyFlag : public rtc::RefCountInterface {
+ public:
+ using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
+ static Pointer Create();
+
+ ~PendingTaskSafetyFlag() = default;
+
+ void SetNotAlive();
+ bool alive() const;
+
+ protected:
+ PendingTaskSafetyFlag() = default;
+
+ private:
+ bool alive_ = true;
+ SequenceChecker main_sequence_;
+};
+
+} // namespace webrtc
+
+#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
diff --git a/rtc_base/task_utils/pending_task_safety_flag_unittest.cc b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
new file mode 100644
index 0000000..0c1c3c8
--- /dev/null
+++ b/rtc_base/task_utils/pending_task_safety_flag_unittest.cc
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+
+#include <memory>
+
+#include "rtc_base/event.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+using ::testing::AtLeast;
+using ::testing::Invoke;
+using ::testing::MockFunction;
+using ::testing::NiceMock;
+using ::testing::Return;
+} // namespace
+
+TEST(PendingTaskSafetyFlagTest, Basic) {
+ PendingTaskSafetyFlag::Pointer safety_flag;
+ {
+ // Scope for the |owner| instance.
+ class Owner {
+ public:
+ Owner() = default;
+ ~Owner() { flag_->SetNotAlive(); }
+
+ PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ } owner;
+ EXPECT_TRUE(owner.flag_->alive());
+ safety_flag = owner.flag_;
+ EXPECT_TRUE(safety_flag->alive());
+ }
+ EXPECT_FALSE(safety_flag->alive());
+}
+
+TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
+ TaskQueueForTest tq1("OwnerHere");
+ TaskQueueForTest tq2("OwnerNotHere");
+
+ class Owner {
+ public:
+ Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); }
+ ~Owner() {
+ RTC_DCHECK(tq_main_->IsCurrent());
+ flag_->SetNotAlive();
+ }
+
+ void DoStuff() {
+ RTC_DCHECK(!tq_main_->IsCurrent());
+ tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
+ if (!safe->alive())
+ return;
+ stuff_done_ = true;
+ }));
+ }
+
+ bool stuff_done() const { return stuff_done_; }
+
+ private:
+ TaskQueueBase* const tq_main_;
+ bool stuff_done_ = false;
+ PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ };
+
+ std::unique_ptr<Owner> owner;
+ tq1.SendTask(
+ [&owner]() {
+ owner.reset(new Owner());
+ EXPECT_FALSE(owner->stuff_done());
+ },
+ RTC_FROM_HERE);
+ ASSERT_TRUE(owner);
+ tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
+ tq1.SendTask(
+ [&owner]() {
+ EXPECT_TRUE(owner->stuff_done());
+ owner.reset();
+ },
+ RTC_FROM_HERE);
+ ASSERT_FALSE(owner);
+}
+
+TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
+ TaskQueueForTest tq1("OwnerHere");
+ TaskQueueForTest tq2("OwnerNotHere");
+
+ class Owner {
+ public:
+ explicit Owner(bool* stuff_done)
+ : tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) {
+ RTC_DCHECK(tq_main_);
+ *stuff_done_ = false;
+ }
+ ~Owner() {
+ RTC_DCHECK(tq_main_->IsCurrent());
+ flag_->SetNotAlive();
+ }
+
+ void DoStuff() {
+ RTC_DCHECK(!tq_main_->IsCurrent());
+ tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
+ if (!safe->alive())
+ return;
+ *stuff_done_ = true;
+ }));
+ }
+
+ private:
+ TaskQueueBase* const tq_main_;
+ bool* const stuff_done_;
+ PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
+ };
+
+ std::unique_ptr<Owner> owner;
+ bool stuff_done = false;
+ tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); },
+ RTC_FROM_HERE);
+ ASSERT_TRUE(owner);
+ // Queue up a task on tq1 that will execute before the 'DoStuff' task
+ // can, and delete the |owner| before the 'stuff' task can execute.
+ rtc::Event blocker;
+ tq1.PostTask([&blocker, &owner]() {
+ blocker.Wait(rtc::Event::kForever);
+ owner.reset();
+ });
+
+ // Queue up a DoStuff...
+ tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
+
+ ASSERT_TRUE(owner);
+ blocker.Set();
+
+ // Run an empty task on tq1 to flush all the queued tasks.
+ tq1.SendTask([]() {}, RTC_FROM_HERE);
+ ASSERT_FALSE(owner);
+ EXPECT_FALSE(stuff_done);
+}
+} // namespace webrtc
diff --git a/test/scenario/call_client.h b/test/scenario/call_client.h
index 803b4a8..33fa276 100644
--- a/test/scenario/call_client.h
+++ b/test/scenario/call_client.h
@@ -113,6 +113,11 @@
void OnPacketReceived(EmulatedIpPacket packet) override;
std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name);
+ // Exposed publicly so that tests can execute tasks such as querying stats
+ // for media streams in the expected runtime environment (essentially what
+ // CallClient does internally for GetStats()).
+ void SendTask(std::function<void()> task);
+
private:
friend class Scenario;
friend class CallClientPair;
@@ -129,7 +134,6 @@
uint32_t GetNextAudioLocalSsrc();
uint32_t GetNextRtxSsrc();
void AddExtensions(std::vector<RtpExtension> extensions);
- void SendTask(std::function<void()> task);
int16_t Bind(EmulatedEndpoint* endpoint);
void UnBind();
diff --git a/test/scenario/stats_collection_unittest.cc b/test/scenario/stats_collection_unittest.cc
index fae3365..af3b982 100644
--- a/test/scenario/stats_collection_unittest.cc
+++ b/test/scenario/stats_collection_unittest.cc
@@ -25,17 +25,26 @@
VideoStreamConfig::Encoder::Implementation::kSoftware;
config.hooks.frame_pair_handlers = {analyzer->Handler()};
auto* caller = s->CreateClient("caller", CallClientConfig());
+ auto* callee = s->CreateClient("callee", CallClientConfig());
auto route =
- s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)},
- s->CreateClient("callee", CallClientConfig()),
+ s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee,
{s->CreateSimulationNode(NetworkSimulationConfig())});
- auto* video = s->CreateVideoStream(route->forward(), config);
+ VideoStreamPair* video = s->CreateVideoStream(route->forward(), config);
auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig());
s->Every(TimeDelta::Seconds(1), [=] {
collectors->call.AddStats(caller->GetStats());
- collectors->audio_receive.AddStats(audio->receive()->GetStats());
collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
- collectors->video_receive.AddStats(video->receive()->GetStats());
+ collectors->audio_receive.AddStats(audio->receive()->GetStats());
+
+ // Querying the video stats from within the expected runtime environment
+ // (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
+ // we're currently on).
+ VideoReceiveStream::Stats video_receive_stats;
+ auto* video_stream = video->receive();
+ callee->SendTask([&video_stream, &video_receive_stats]() {
+ video_receive_stats = video_stream->GetStats();
+ });
+ collectors->video_receive.AddStats(video_receive_stats);
});
}
} // namespace
diff --git a/video/BUILD.gn b/video/BUILD.gn
index 14109c3..9d26ee2 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -115,6 +115,7 @@
"../rtc_base/experiments:rate_control_settings",
"../rtc_base/synchronization:sequence_checker",
"../rtc_base/system:thread_registry",
+ "../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:repeating_task",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/time:timestamp_extrapolator",
diff --git a/video/end_to_end_tests/retransmission_tests.cc b/video/end_to_end_tests/retransmission_tests.cc
index 407aa5f..c28b129 100644
--- a/video/end_to_end_tests/retransmission_tests.cc
+++ b/video/end_to_end_tests/retransmission_tests.cc
@@ -18,8 +18,8 @@
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/video_coding/codecs/vp8/include/vp8.h"
+#include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h"
-#include "system_wrappers/include/sleep.h"
#include "test/call_test.h"
#include "test/field_trial.h"
#include "test/gtest.h"
@@ -203,7 +203,7 @@
TEST_F(RetransmissionEndToEndTest,
StopSendingKeyframeRequestsForInactiveStream) {
- class KeyframeRequestObserver : public test::EndToEndTest {
+ class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
public:
explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
: clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@@ -216,28 +216,59 @@
receive_stream_ = receive_streams[0];
}
- void PerformTest() override {
- bool frame_decoded = false;
- int64_t start_time = clock_->TimeInMilliseconds();
- while (clock_->TimeInMilliseconds() - start_time <= 5000) {
- if (receive_stream_->GetStats().frames_decoded > 0) {
- frame_decoded = true;
- break;
- }
- SleepMs(100);
+ Action OnReceiveRtcp(const uint8_t* packet, size_t length) override {
+ test::RtcpPacketParser parser;
+ EXPECT_TRUE(parser.Parse(packet, length));
+ if (parser.pli()->num_packets() > 0)
+ task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+ return SEND_PACKET;
+ }
+
+ bool PollStats() {
+ if (receive_stream_->GetStats().frames_decoded > 0) {
+ frame_decoded_ = true;
+ } else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
+ task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
+ return false;
}
- ASSERT_TRUE(frame_decoded);
- SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); });
- SleepMs(10000);
- ASSERT_EQ(
- 1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
+ return true;
+ }
+
+ void PerformTest() override {
+ start_time_ = clock_->TimeInMilliseconds();
+ task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
+ test_done_.Wait(rtc::Event::kForever);
+ }
+
+ bool Run() override {
+ if (!frame_decoded_) {
+ if (PollStats()) {
+ send_stream_->Stop();
+ if (!frame_decoded_) {
+ test_done_.Set();
+ } else {
+ // Now we wait for the PLI packet. Once we receive it, a task
+ // will be posted (see OnReceiveRtcp) and we'll check the stats
+ // once more before signaling that we're done.
+ }
+ }
+ } else {
+ EXPECT_EQ(
+ 1U,
+ receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
+ test_done_.Set();
+ }
+ return false;
}
private:
- Clock* clock_;
+ Clock* const clock_;
VideoSendStream* send_stream_;
VideoReceiveStream* receive_stream_;
TaskQueueBase* const task_queue_;
+ rtc::Event test_done_;
+ bool frame_decoded_ = false;
+ int64_t start_time_ = 0;
} test(task_queue());
RunBaseTest(&test);
diff --git a/video/end_to_end_tests/stats_tests.cc b/video/end_to_end_tests/stats_tests.cc
index b43f79d..32bcedb 100644
--- a/video/end_to_end_tests/stats_tests.cc
+++ b/video/end_to_end_tests/stats_tests.cc
@@ -297,6 +297,7 @@
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
+ task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
@@ -307,8 +308,10 @@
bool send_ok = false;
while (now_ms < stop_time_ms) {
- if (!receive_ok)
- receive_ok = CheckReceiveStats();
+ if (!receive_ok && task_queue_) {
+ SendTask(RTC_FROM_HERE, task_queue_,
+ [&]() { receive_ok = CheckReceiveStats(); });
+ }
if (!send_ok)
send_ok = CheckSendStats();
@@ -346,6 +349,7 @@
rtc::Event check_stats_event_;
ReceiveStreamRenderer receive_stream_renderer_;
+ TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -377,22 +381,28 @@
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_streams_ = receive_streams;
+ task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
// No frames reported initially.
- for (const auto& receive_stream : receive_streams_) {
- EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
- }
+ SendTask(RTC_FROM_HERE, task_queue_, [&]() {
+ for (const auto& receive_stream : receive_streams_) {
+ EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
+ }
+ });
// Wait for at least one timing frame to be sent with 100ms grace period.
SleepMs(kDefaultTimingFramesDelayMs + 100);
// Check that timing frames are reported for each stream.
- for (const auto& receive_stream : receive_streams_) {
- EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
- }
+ SendTask(RTC_FROM_HERE, task_queue_, [&]() {
+ for (const auto& receive_stream : receive_streams_) {
+ EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
+ }
+ });
}
std::vector<VideoReceiveStream*> receive_streams_;
+ TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -400,7 +410,8 @@
TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
static const size_t kNumRtpPacketsToSend = 5;
- class ReceivedRtpStatsObserver : public test::EndToEndTest {
+ class ReceivedRtpStatsObserver : public test::EndToEndTest,
+ public QueuedTask {
public:
ReceivedRtpStatsObserver()
: EndToEndTest(kDefaultTimeoutMs),
@@ -412,14 +423,14 @@
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_stream_ = receive_streams[0];
+ task_queue_ = TaskQueueBase::Current();
+ EXPECT_TRUE(task_queue_ != nullptr);
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
if (sent_rtp_ >= kNumRtpPacketsToSend) {
- VideoReceiveStream::Stats stats = receive_stream_->GetStats();
- if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
- observation_complete_.Set();
- }
+ // Need to check the stats on the correct thread.
+ task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return DROP_PACKET;
}
++sent_rtp_;
@@ -431,8 +442,17 @@
<< "Timed out while verifying number of received RTP packets.";
}
+ bool Run() override {
+ VideoReceiveStream::Stats stats = receive_stream_->GetStats();
+ if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
+ observation_complete_.Set();
+ }
+ return false;
+ }
+
VideoReceiveStream* receive_stream_;
uint32_t sent_rtp_;
+ TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@@ -578,7 +598,7 @@
TEST_F(StatsEndToEndTest, VerifyNackStats) {
static const int kPacketNumberToDrop = 200;
- class NackObserver : public test::EndToEndTest {
+ class NackObserver : public test::EndToEndTest, public QueuedTask {
public:
NackObserver()
: EndToEndTest(kLongTimeoutMs),
@@ -598,7 +618,7 @@
dropped_rtp_packet_ = header.sequenceNumber;
return DROP_PACKET;
}
- VerifyStats();
+ task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return SEND_PACKET;
}
@@ -659,6 +679,14 @@
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
+ task_queue_ = TaskQueueBase::Current();
+ EXPECT_TRUE(task_queue_ != nullptr);
+ }
+
+ bool Run() override {
+ rtc::CritScope lock(&crit_);
+ VerifyStats();
+ return false;
}
void PerformTest() override {
@@ -673,6 +701,7 @@
std::vector<VideoReceiveStream*> receive_streams_;
VideoSendStream* send_stream_;
absl::optional<int64_t> start_runtime_ms_;
+ TaskQueueBase* task_queue_ = nullptr;
} test;
metrics::Reset();