Reduce locking for CallStats (preparation for TaskQueue).
Reduce synchronization in the class significantly and not hold a lock
while calling out to external implementations.
* Rewrite tests to use a real ProcessThread.
* Update some code to use C++ 11 constructs & library features.
Bug: webrtc:9064
Change-Id: I240a819efb6ef8197da3f2edf7acf068d2a27e8b
Reviewed-on: https://webrtc-review.googlesource.com/64521
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22649}
diff --git a/call/call.cc b/call/call.cc
index 2afabf3..a1d1ec0 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -420,7 +420,7 @@
: clock_(Clock::GetRealTimeClock()),
num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(ProcessThread::Create("ModuleProcessThread")),
- call_stats_(new CallStats(clock_)),
+ call_stats_(new CallStats(clock_, module_process_thread_.get())),
bitrate_allocator_(new BitrateAllocator(this)),
config_(config),
audio_network_state_(kNetworkDown),
@@ -592,8 +592,7 @@
AudioSendStream* send_stream = new AudioSendStream(
config, config_.audio_state, &worker_queue_, module_process_thread_.get(),
transport_send_.get(), bitrate_allocator_.get(), event_log_,
- call_stats_->rtcp_rtt_stats(), suspended_rtp_state,
- &sent_rtp_audio_timer_ms_);
+ call_stats_.get(), suspended_rtp_state, &sent_rtp_audio_timer_ms_);
{
WriteLockScoped write_lock(*send_crit_);
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
@@ -872,7 +871,7 @@
// this locked scope.
receive_stream = new FlexfecReceiveStreamImpl(
&video_receiver_controller_, config, recovered_packet_receiver,
- call_stats_->rtcp_rtt_stats(), module_process_thread_.get());
+ call_stats_.get(), module_process_thread_.get());
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
@@ -933,7 +932,7 @@
aggregate_network_up_ ? transport_send_->GetPacerQueuingDelayMs() : 0;
}
- stats.rtt_ms = call_stats_->rtcp_rtt_stats()->LastProcessedRtt();
+ stats.rtt_ms = call_stats_->LastProcessedRtt();
{
rtc::CritScope cs(&bitrate_crit_);
stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
diff --git a/video/BUILD.gn b/video/BUILD.gn
index f3d2c6d..49e1865 100644
--- a/video/BUILD.gn
+++ b/video/BUILD.gn
@@ -372,6 +372,7 @@
"../rtc_base:rtc_base_approved",
"../rtc_base:rtc_base_tests_utils",
"../rtc_base:rtc_numerics",
+ "../rtc_base:rtc_task_queue",
"../rtc_base/experiments:alr_experiment",
"../system_wrappers",
"../system_wrappers:field_trial_default",
diff --git a/video/call_stats.cc b/video/call_stats.cc
index c9f0199..8948e68 100644
--- a/video/call_stats.cc
+++ b/video/call_stats.cc
@@ -12,177 +12,209 @@
#include <algorithm>
-#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/constructormagic.h"
+#include "rtc_base/location.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_queue.h"
#include "system_wrappers/include/metrics.h"
namespace webrtc {
namespace {
-// Time interval for updating the observers.
-const int64_t kUpdateIntervalMs = 1000;
-// Weight factor to apply to the average rtt.
-const float kWeightFactor = 0.3f;
void RemoveOldReports(int64_t now, std::list<CallStats::RttTime>* reports) {
- // A rtt report is considered valid for this long.
- const int64_t kRttTimeoutMs = 1500;
- while (!reports->empty() &&
- (now - reports->front().time) > kRttTimeoutMs) {
- reports->pop_front();
- }
+ static constexpr const int64_t kRttTimeoutMs = 1500;
+ reports->remove_if(
+ [&now](CallStats::RttTime& r) { return now - r.time > kRttTimeoutMs; });
}
-int64_t GetMaxRttMs(std::list<CallStats::RttTime>* reports) {
- if (reports->empty())
- return -1;
- int64_t max_rtt_ms = 0;
- for (const CallStats::RttTime& rtt_time : *reports)
+int64_t GetMaxRttMs(const std::list<CallStats::RttTime>& reports) {
+ int64_t max_rtt_ms = -1;
+ for (const CallStats::RttTime& rtt_time : reports)
max_rtt_ms = std::max(rtt_time.rtt, max_rtt_ms);
return max_rtt_ms;
}
-int64_t GetAvgRttMs(std::list<CallStats::RttTime>* reports) {
- if (reports->empty()) {
- return -1;
- }
+int64_t GetAvgRttMs(const std::list<CallStats::RttTime>& reports) {
+ RTC_DCHECK(!reports.empty());
int64_t sum = 0;
- for (std::list<CallStats::RttTime>::const_iterator it = reports->begin();
- it != reports->end(); ++it) {
+ for (std::list<CallStats::RttTime>::const_iterator it = reports.begin();
+ it != reports.end(); ++it) {
sum += it->rtt;
}
- return sum / reports->size();
+ return sum / reports.size();
}
-void UpdateAvgRttMs(std::list<CallStats::RttTime>* reports, int64_t* avg_rtt) {
+int64_t GetNewAvgRttMs(const std::list<CallStats::RttTime>& reports,
+ int64_t prev_avg_rtt) {
+ if (reports.empty())
+ return -1; // Reset (invalid average).
+
int64_t cur_rtt_ms = GetAvgRttMs(reports);
- if (cur_rtt_ms == -1) {
- // Reset.
- *avg_rtt = -1;
- return;
- }
- if (*avg_rtt == -1) {
- // Initialize.
- *avg_rtt = cur_rtt_ms;
- return;
- }
- *avg_rtt = *avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor;
+ if (prev_avg_rtt == -1)
+ return cur_rtt_ms; // New initial average value.
+
+ // Weight factor to apply to the average rtt.
+ // We weigh the old average at 70% against the new average (30%).
+ constexpr const float kWeightFactor = 0.3f;
+ return prev_avg_rtt * (1.0f - kWeightFactor) + cur_rtt_ms * kWeightFactor;
}
-} // namespace
-class RtcpObserver : public RtcpRttStats {
+// This class is used to de-register a Module from a ProcessThread to satisfy
+// threading requirements of the Module (CallStats).
+// The guarantee offered by TemporaryDeregistration is that while its in scope,
+// no calls to |TimeUntilNextProcess| or |Process()| will occur and therefore
+// synchronization with those methods, is not necessary.
+class TemporaryDeregistration {
public:
- explicit RtcpObserver(CallStats* owner) : owner_(owner) {}
- virtual ~RtcpObserver() {}
-
- virtual void OnRttUpdate(int64_t rtt) {
- owner_->OnRttUpdate(rtt);
+ TemporaryDeregistration(Module* module,
+ ProcessThread* process_thread,
+ bool thread_running)
+ : module_(module),
+ process_thread_(process_thread),
+ deregistered_(thread_running) {
+ if (thread_running)
+ process_thread_->DeRegisterModule(module_);
}
-
- // Returns the average RTT.
- virtual int64_t LastProcessedRtt() const {
- return owner_->avg_rtt_ms();
+ ~TemporaryDeregistration() {
+ if (deregistered_)
+ process_thread_->RegisterModule(module_, RTC_FROM_HERE);
}
private:
- CallStats* owner_;
-
- RTC_DISALLOW_COPY_AND_ASSIGN(RtcpObserver);
+ Module* const module_;
+ ProcessThread* const process_thread_;
+ const bool deregistered_;
};
-CallStats::CallStats(Clock* clock)
+} // namespace
+
+CallStats::CallStats(Clock* clock, ProcessThread* process_thread)
: clock_(clock),
- rtcp_rtt_stats_(new RtcpObserver(this)),
last_process_time_(clock_->TimeInMilliseconds()),
max_rtt_ms_(-1),
avg_rtt_ms_(-1),
sum_avg_rtt_ms_(0),
num_avg_rtt_(0),
- time_of_first_rtt_ms_(-1) {}
+ time_of_first_rtt_ms_(-1),
+ process_thread_(process_thread),
+ process_thread_running_(false) {
+ RTC_DCHECK(process_thread_);
+ process_thread_checker_.DetachFromThread();
+}
CallStats::~CallStats() {
+ RTC_DCHECK_RUN_ON(&construction_thread_checker_);
+ RTC_DCHECK(!process_thread_running_);
RTC_DCHECK(observers_.empty());
+
UpdateHistograms();
}
int64_t CallStats::TimeUntilNextProcess() {
+ RTC_DCHECK_RUN_ON(&process_thread_checker_);
return last_process_time_ + kUpdateIntervalMs - clock_->TimeInMilliseconds();
}
void CallStats::Process() {
- rtc::CritScope cs(&crit_);
+ RTC_DCHECK_RUN_ON(&process_thread_checker_);
int64_t now = clock_->TimeInMilliseconds();
- if (now < last_process_time_ + kUpdateIntervalMs)
- return;
-
last_process_time_ = now;
+ int64_t avg_rtt_ms = avg_rtt_ms_;
RemoveOldReports(now, &reports_);
- max_rtt_ms_ = GetMaxRttMs(&reports_);
- UpdateAvgRttMs(&reports_, &avg_rtt_ms_);
+ max_rtt_ms_ = GetMaxRttMs(reports_);
+ avg_rtt_ms = GetNewAvgRttMs(reports_, avg_rtt_ms);
+ {
+ rtc::CritScope lock(&avg_rtt_ms_lock_);
+ avg_rtt_ms_ = avg_rtt_ms;
+ }
// If there is a valid rtt, update all observers with the max rtt.
if (max_rtt_ms_ >= 0) {
- RTC_DCHECK_GE(avg_rtt_ms_, 0);
- for (std::list<CallStatsObserver*>::iterator it = observers_.begin();
- it != observers_.end(); ++it) {
- (*it)->OnRttUpdate(avg_rtt_ms_, max_rtt_ms_);
- }
+ RTC_DCHECK_GE(avg_rtt_ms, 0);
+ for (CallStatsObserver* observer : observers_)
+ observer->OnRttUpdate(avg_rtt_ms, max_rtt_ms_);
// Sum for Histogram of average RTT reported over the entire call.
- sum_avg_rtt_ms_ += avg_rtt_ms_;
+ sum_avg_rtt_ms_ += avg_rtt_ms;
++num_avg_rtt_;
}
}
-int64_t CallStats::avg_rtt_ms() const {
- rtc::CritScope cs(&crit_);
- return avg_rtt_ms_;
-}
+void CallStats::ProcessThreadAttached(ProcessThread* process_thread) {
+ RTC_DCHECK_RUN_ON(&construction_thread_checker_);
+ RTC_DCHECK(!process_thread || process_thread_ == process_thread);
+ process_thread_running_ = process_thread != nullptr;
-RtcpRttStats* CallStats::rtcp_rtt_stats() const {
- return rtcp_rtt_stats_.get();
+ // Whether we just got attached or detached, we clear the
+ // |process_thread_checker_| so that it can be used to protect variables
+ // in either the process thread when it starts again, or UpdateHistograms()
+ // (mutually exclusive).
+ process_thread_checker_.DetachFromThread();
}
void CallStats::RegisterStatsObserver(CallStatsObserver* observer) {
- rtc::CritScope cs(&crit_);
- for (std::list<CallStatsObserver*>::iterator it = observers_.begin();
- it != observers_.end(); ++it) {
- if (*it == observer)
- return;
- }
- observers_.push_back(observer);
+ RTC_DCHECK_RUN_ON(&construction_thread_checker_);
+ TemporaryDeregistration deregister(this, process_thread_,
+ process_thread_running_);
+
+ auto it = std::find(observers_.begin(), observers_.end(), observer);
+ if (it == observers_.end())
+ observers_.push_back(observer);
}
void CallStats::DeregisterStatsObserver(CallStatsObserver* observer) {
- rtc::CritScope cs(&crit_);
- for (std::list<CallStatsObserver*>::iterator it = observers_.begin();
- it != observers_.end(); ++it) {
- if (*it == observer) {
- observers_.erase(it);
- return;
- }
- }
+ RTC_DCHECK_RUN_ON(&construction_thread_checker_);
+ TemporaryDeregistration deregister(this, process_thread_,
+ process_thread_running_);
+ observers_.remove(observer);
+}
+
+int64_t CallStats::LastProcessedRtt() const {
+ rtc::CritScope cs(&avg_rtt_ms_lock_);
+ return avg_rtt_ms_;
}
void CallStats::OnRttUpdate(int64_t rtt) {
- rtc::CritScope cs(&crit_);
int64_t now_ms = clock_->TimeInMilliseconds();
- reports_.push_back(RttTime(rtt, now_ms));
- if (time_of_first_rtt_ms_ == -1)
- time_of_first_rtt_ms_ = now_ms;
+ process_thread_->PostTask(rtc::NewClosure([rtt, now_ms, this]() {
+ RTC_DCHECK_RUN_ON(&process_thread_checker_);
+ reports_.push_back(RttTime(rtt, now_ms));
+ if (time_of_first_rtt_ms_ == -1)
+ time_of_first_rtt_ms_ = now_ms;
+
+ process_thread_->WakeUp(this);
+ }));
}
void CallStats::UpdateHistograms() {
- rtc::CritScope cs(&crit_);
- if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1)
- return;
+ RTC_DCHECK_RUN_ON(&construction_thread_checker_);
+ RTC_DCHECK(!process_thread_running_);
- int64_t elapsed_sec =
- (clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000;
- if (elapsed_sec >= metrics::kMinRunTimeInSeconds) {
- int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_;
- RTC_HISTOGRAM_COUNTS_10000(
- "WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms);
+ // The extra scope is because we have two 'dcheck run on' thread checkers.
+ // This is a special case since it's safe to access variables on the current
+ // thread that normally are only touched on the process thread.
+ // Since we're not attached to the process thread and/or the process thread
+ // isn't running, it's OK to touch these variables here.
+ {
+ // This method is called on the ctor thread (usually from the dtor, unless
+ // a test calls it). It's a requirement that the function be called when
+ // the process thread is not running (a condition that's met at destruction
+ // time), and thanks to that, we don't need a lock to synchronize against
+ // it.
+ RTC_DCHECK_RUN_ON(&process_thread_checker_);
+
+ if (time_of_first_rtt_ms_ == -1 || num_avg_rtt_ < 1)
+ return;
+
+ int64_t elapsed_sec =
+ (clock_->TimeInMilliseconds() - time_of_first_rtt_ms_) / 1000;
+ if (elapsed_sec >= metrics::kMinRunTimeInSeconds) {
+ int64_t avg_rtt_ms = (sum_avg_rtt_ms_ + num_avg_rtt_ / 2) / num_avg_rtt_;
+ RTC_HISTOGRAM_COUNTS_10000(
+ "WebRTC.Video.AverageRoundTripTimeInMilliseconds", avg_rtt_ms);
+ }
}
}
diff --git a/video/call_stats.h b/video/call_stats.h
index af5c45c..5ca44fa 100644
--- a/video/call_stats.h
+++ b/video/call_stats.h
@@ -15,35 +15,42 @@
#include <memory>
#include "modules/include/module.h"
+#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/criticalsection.h"
+#include "rtc_base/thread_checker.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
class CallStatsObserver;
-class RtcpRttStats;
// CallStats keeps track of statistics for a call.
-class CallStats : public Module {
+class CallStats : public Module, public RtcpRttStats {
public:
- friend class RtcpObserver;
+ // Time interval for updating the observers.
+ static constexpr int64_t kUpdateIntervalMs = 1000;
- explicit CallStats(Clock* clock);
+ CallStats(Clock* clock, ProcessThread* process_thread);
~CallStats();
- // Implements Module, to use the process thread.
- int64_t TimeUntilNextProcess() override;
- void Process() override;
-
- // Returns a RtcpRttStats to register at a statistics provider. The object
- // has the same lifetime as the CallStats instance.
- RtcpRttStats* rtcp_rtt_stats() const;
-
// Registers/deregisters a new observer to receive statistics updates.
+ // Must be called from the construction thread.
void RegisterStatsObserver(CallStatsObserver* observer);
void DeregisterStatsObserver(CallStatsObserver* observer);
+ // Expose |LastProcessedRtt()| from RtcpRttStats to the public interface, as
+ // it is the part of the API that is needed by direct users of CallStats.
+ // TODO(tommi): Threading or lifetime guarantees are not explicit in how
+ // CallStats is used as RtcpRttStats or how pointers are cached in a
+ // few different places (distributed via Call). It would be good to clarify
+ // from what thread/TQ calls to OnRttUpdate and LastProcessedRtt need to be
+ // allowed.
+ int64_t LastProcessedRtt() const override;
+
+ // Exposed for tests to test histogram support.
+ void UpdateHistogramsForTest() { UpdateHistograms(); }
+
// Helper struct keeping track of the time a rtt value is reported.
struct RttTime {
RttTime(int64_t new_rtt, int64_t rtt_time)
@@ -52,34 +59,62 @@
const int64_t time;
};
- protected:
- void OnRttUpdate(int64_t rtt);
-
- int64_t avg_rtt_ms() const;
-
private:
+ // RtcpRttStats implementation.
+ void OnRttUpdate(int64_t rtt) override;
+
+ // Implements Module, to use the process thread.
+ int64_t TimeUntilNextProcess() override;
+ void Process() override;
+
+ // TODO(tommi): Use this to know when we're attached to the process thread?
+ // Alternatively, inject that pointer via the ctor since the call_stats
+ // test code, isn't using a processthread atm.
+ void ProcessThreadAttached(ProcessThread* process_thread) override;
+
+ // This method must only be called when the process thread is not
+ // running, and from the construction thread.
void UpdateHistograms();
Clock* const clock_;
- // Protecting all members.
- rtc::CriticalSection crit_;
- // Observer receiving statistics updates.
- std::unique_ptr<RtcpRttStats> rtcp_rtt_stats_;
+
// The last time 'Process' resulted in statistic update.
- int64_t last_process_time_;
+ int64_t last_process_time_ RTC_GUARDED_BY(process_thread_checker_);
// The last RTT in the statistics update (zero if there is no valid estimate).
- int64_t max_rtt_ms_;
+ int64_t max_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_);
+
+ // Accessed from random threads (seemingly). Consider atomic.
+ // |avg_rtt_ms_| is allowed to be read on the process thread without a lock.
+ // |avg_rtt_ms_lock_| must be held elsewhere for reading.
+ // |avg_rtt_ms_lock_| must be held on the process thread for writing.
int64_t avg_rtt_ms_;
- int64_t sum_avg_rtt_ms_ RTC_GUARDED_BY(crit_);
- int64_t num_avg_rtt_ RTC_GUARDED_BY(crit_);
- int64_t time_of_first_rtt_ms_ RTC_GUARDED_BY(crit_);
+
+ // Protects |avg_rtt_ms_|.
+ rtc::CriticalSection avg_rtt_ms_lock_;
+
+ // |sum_avg_rtt_ms_|, |num_avg_rtt_| and |time_of_first_rtt_ms_| are only used
+ // on the ProcessThread when running. When the Process Thread is not running,
+ // (and only then) they can be used in UpdateHistograms(), usually called from
+ // the dtor.
+ int64_t sum_avg_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_);
+ int64_t num_avg_rtt_ RTC_GUARDED_BY(process_thread_checker_);
+ int64_t time_of_first_rtt_ms_ RTC_GUARDED_BY(process_thread_checker_);
// All Rtt reports within valid time interval, oldest first.
- std::list<RttTime> reports_;
+ std::list<RttTime> reports_ RTC_GUARDED_BY(process_thread_checker_);
// Observers getting stats reports.
+ // When attached to ProcessThread, this is read-only. In order to allow
+ // modification, we detach from the process thread while the observer
+ // list is updated, to avoid races. This allows us to not require a lock
+ // for the observers_ list, which makes the most common case lock free.
std::list<CallStatsObserver*> observers_;
+ rtc::ThreadChecker construction_thread_checker_;
+ rtc::ThreadChecker process_thread_checker_;
+ ProcessThread* const process_thread_;
+ bool process_thread_running_ RTC_GUARDED_BY(construction_thread_checker_);
+
RTC_DISALLOW_COPY_AND_ASSIGN(CallStats);
};
diff --git a/video/call_stats_unittest.cc b/video/call_stats_unittest.cc
index 989722d..33076c5 100644
--- a/video/call_stats_unittest.cc
+++ b/video/call_stats_unittest.cc
@@ -8,17 +8,23 @@
* be found in the AUTHORS file in the root of the source tree.
*/
+#include "video/call_stats.h"
+
#include <memory>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/utility/include/process_thread.h"
+#include "rtc_base/event.h"
+#include "rtc_base/location.h"
+#include "rtc_base/task_queue.h"
#include "system_wrappers/include/metrics.h"
#include "system_wrappers/include/metrics_default.h"
#include "test/gmock.h"
#include "test/gtest.h"
-#include "video/call_stats.h"
using ::testing::_;
using ::testing::AnyNumber;
+using ::testing::InvokeWithoutArgs;
using ::testing::Return;
namespace webrtc {
@@ -33,184 +39,277 @@
class CallStatsTest : public ::testing::Test {
public:
- CallStatsTest() : fake_clock_(12345) {}
+ CallStatsTest() {
+ process_thread_->RegisterModule(&call_stats_, RTC_FROM_HERE);
+ process_thread_->Start();
+ }
+ ~CallStatsTest() override {
+ process_thread_->Stop();
+ process_thread_->DeRegisterModule(&call_stats_);
+ }
protected:
- virtual void SetUp() { call_stats_.reset(new CallStats(&fake_clock_)); }
- SimulatedClock fake_clock_;
- std::unique_ptr<CallStats> call_stats_;
+ std::unique_ptr<ProcessThread> process_thread_{
+ ProcessThread::Create("CallStats")};
+ SimulatedClock fake_clock_{12345};
+ CallStats call_stats_{&fake_clock_, process_thread_.get()};
};
TEST_F(CallStatsTest, AddAndTriggerCallback) {
+ rtc::Event event(false, false);
+
+ static constexpr const int64_t kRtt = 25;
+
MockStatsObserver stats_observer;
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
- call_stats_->RegisterStatsObserver(&stats_observer);
- fake_clock_.AdvanceTimeMilliseconds(1000);
+ EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([&event] { event.Set(); }));
+
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
+ call_stats_.RegisterStatsObserver(&stats_observer);
EXPECT_EQ(-1, rtcp_rtt_stats->LastProcessedRtt());
- const int64_t kRtt = 25;
rtcp_rtt_stats->OnRttUpdate(kRtt);
- EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt)).Times(1);
- call_stats_->Process();
+
+ EXPECT_TRUE(event.Wait(1000));
EXPECT_EQ(kRtt, rtcp_rtt_stats->LastProcessedRtt());
- const int64_t kRttTimeOutMs = 1500 + 10;
- fake_clock_.AdvanceTimeMilliseconds(kRttTimeOutMs);
- EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0);
- call_stats_->Process();
- EXPECT_EQ(-1, rtcp_rtt_stats->LastProcessedRtt());
-
- call_stats_->DeregisterStatsObserver(&stats_observer);
+ call_stats_.DeregisterStatsObserver(&stats_observer);
}
TEST_F(CallStatsTest, ProcessTime) {
+ rtc::Event event(false, false);
+
+ static constexpr const int64_t kRtt = 100;
+ static constexpr const int64_t kRtt2 = 80;
+
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
+
MockStatsObserver stats_observer;
- call_stats_->RegisterStatsObserver(&stats_observer);
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
- rtcp_rtt_stats->OnRttUpdate(100);
- // Time isn't updated yet.
- EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0);
- call_stats_->Process();
+ EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt))
+ .Times(2)
+ .WillOnce(InvokeWithoutArgs([this] {
+ // Advance clock and verify we get an update.
+ fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
+ }))
+ .WillRepeatedly(InvokeWithoutArgs([this, rtcp_rtt_stats] {
+ rtcp_rtt_stats->OnRttUpdate(kRtt2);
+ // Advance clock just too little to get an update.
+ fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs - 1);
+ }));
- // Advance clock and verify we get an update.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(1);
- call_stats_->Process();
+ // In case you're reading this and wondering how this number is arrived at,
+ // please see comments in the ChangeRtt test that go into some detail.
+ static constexpr const int64_t kLastAvg = 94;
+ EXPECT_CALL(stats_observer, OnRttUpdate(kLastAvg, kRtt2))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([&event] { event.Set(); }));
- // Advance clock just too little to get an update.
- fake_clock_.AdvanceTimeMilliseconds(999);
- rtcp_rtt_stats->OnRttUpdate(100);
- EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(0);
- call_stats_->Process();
+ call_stats_.RegisterStatsObserver(&stats_observer);
- // Advance enough to trigger a new update.
- fake_clock_.AdvanceTimeMilliseconds(1);
- EXPECT_CALL(stats_observer, OnRttUpdate(_, _)).Times(1);
- call_stats_->Process();
+ rtcp_rtt_stats->OnRttUpdate(kRtt);
+ EXPECT_TRUE(event.Wait(1000));
- call_stats_->DeregisterStatsObserver(&stats_observer);
+ call_stats_.DeregisterStatsObserver(&stats_observer);
}
// Verify all observers get correct estimates and observers can be added and
// removed.
TEST_F(CallStatsTest, MultipleObservers) {
MockStatsObserver stats_observer_1;
- call_stats_->RegisterStatsObserver(&stats_observer_1);
+ call_stats_.RegisterStatsObserver(&stats_observer_1);
// Add the second observer twice, there should still be only one report to the
// observer.
MockStatsObserver stats_observer_2;
- call_stats_->RegisterStatsObserver(&stats_observer_2);
- call_stats_->RegisterStatsObserver(&stats_observer_2);
+ call_stats_.RegisterStatsObserver(&stats_observer_2);
+ call_stats_.RegisterStatsObserver(&stats_observer_2);
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
- const int64_t kRtt = 100;
- rtcp_rtt_stats->OnRttUpdate(kRtt);
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
+ static constexpr const int64_t kRtt = 100;
// Verify both observers are updated.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(1);
- EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(1);
- call_stats_->Process();
+ rtc::Event ev1(false, false);
+ rtc::Event ev2(false, false);
+ EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt))
+ .Times(AnyNumber())
+ .WillOnce(InvokeWithoutArgs([&ev1] { ev1.Set(); }))
+ .WillRepeatedly(Return());
+ EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt))
+ .Times(AnyNumber())
+ .WillOnce(InvokeWithoutArgs([&ev2] { ev2.Set(); }))
+ .WillRepeatedly(Return());
+ rtcp_rtt_stats->OnRttUpdate(kRtt);
+ ASSERT_TRUE(ev1.Wait(100));
+ ASSERT_TRUE(ev2.Wait(100));
// Deregister the second observer and verify update is only sent to the first
// observer.
- call_stats_->DeregisterStatsObserver(&stats_observer_2);
- rtcp_rtt_stats->OnRttUpdate(kRtt);
- fake_clock_.AdvanceTimeMilliseconds(1000);
- EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(1);
+ call_stats_.DeregisterStatsObserver(&stats_observer_2);
+
+ EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt))
+ .Times(AnyNumber())
+ .WillOnce(InvokeWithoutArgs([&ev1] { ev1.Set(); }))
+ .WillRepeatedly(Return());
EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0);
- call_stats_->Process();
+ rtcp_rtt_stats->OnRttUpdate(kRtt);
+ ASSERT_TRUE(ev1.Wait(100));
// Deregister the first observer.
- call_stats_->DeregisterStatsObserver(&stats_observer_1);
- rtcp_rtt_stats->OnRttUpdate(kRtt);
- fake_clock_.AdvanceTimeMilliseconds(1000);
+ call_stats_.DeregisterStatsObserver(&stats_observer_1);
+
+ // Now make sure we don't get any callbacks.
EXPECT_CALL(stats_observer_1, OnRttUpdate(kRtt, kRtt)).Times(0);
EXPECT_CALL(stats_observer_2, OnRttUpdate(kRtt, kRtt)).Times(0);
- call_stats_->Process();
+ rtcp_rtt_stats->OnRttUpdate(kRtt);
+
+ // Force a call to Process().
+ process_thread_->WakeUp(&call_stats_);
+
+ // Flush the queue on the process thread to make sure we return after
+ // Process() has been called.
+ rtc::Event event(false, false);
+ process_thread_->PostTask(rtc::NewClosure([&event]() { event.Set(); }));
+ event.Wait(rtc::Event::kForever);
}
// Verify increasing and decreasing rtt triggers callbacks with correct values.
TEST_F(CallStatsTest, ChangeRtt) {
+ // TODO(tommi): This test assumes things about how old reports are removed
+ // inside of call_stats.cc. The threshold ms value is 1500ms, but it's not
+ // clear here that how the clock is advanced, affects that algorithm and
+ // subsequently the average reported rtt.
+
MockStatsObserver stats_observer;
- call_stats_->RegisterStatsObserver(&stats_observer);
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
+ call_stats_.RegisterStatsObserver(&stats_observer);
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
- // Advance clock to be ready for an update.
- fake_clock_.AdvanceTimeMilliseconds(1000);
+ rtc::Event event(false, false);
- // Set a first value and verify the callback is triggered.
- const int64_t kFirstRtt = 100;
- rtcp_rtt_stats->OnRttUpdate(kFirstRtt);
- EXPECT_CALL(stats_observer, OnRttUpdate(kFirstRtt, kFirstRtt)).Times(1);
- call_stats_->Process();
+ static constexpr const int64_t kFirstRtt = 100;
+ static constexpr const int64_t kLowRtt = kFirstRtt - 20;
+ static constexpr const int64_t kHighRtt = kFirstRtt + 20;
- // Increase rtt and verify the new value is reported.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- const int64_t kHighRtt = kFirstRtt + 20;
- const int64_t kAvgRtt1 = 103;
- rtcp_rtt_stats->OnRttUpdate(kHighRtt);
- EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kHighRtt)).Times(1);
- call_stats_->Process();
+ EXPECT_CALL(stats_observer, OnRttUpdate(kFirstRtt, kFirstRtt))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([&rtcp_rtt_stats, this] {
+ fake_clock_.AdvanceTimeMilliseconds(1000);
+ rtcp_rtt_stats->OnRttUpdate(kHighRtt); // Reported at T1 (1000ms).
+ }));
+
+ // TODO(tommi): This relies on the internal algorithms of call_stats.cc.
+ // There's a weight factor there (0.3), that weighs the previous average to
+ // the new one by 70%, so the number 103 in this case is arrived at like so:
+ // (100) / 1 * 0.7 + (100+120)/2 * 0.3 = 103
+ static constexpr const int64_t kAvgRtt1 = 103;
+ EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kHighRtt))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([&rtcp_rtt_stats, this] {
+ // This interacts with an internal implementation detail in call_stats
+ // that decays the oldest rtt value. See more below.
+ fake_clock_.AdvanceTimeMilliseconds(1000);
+ rtcp_rtt_stats->OnRttUpdate(kLowRtt); // Reported at T2 (2000ms).
+ }));
// Increase time enough for a new update, but not too much to make the
// rtt invalid. Report a lower rtt and verify the old/high value still is sent
// in the callback.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- const int64_t kLowRtt = kFirstRtt - 20;
- const int64_t kAvgRtt2 = 102;
- rtcp_rtt_stats->OnRttUpdate(kLowRtt);
- EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kHighRtt)).Times(1);
- call_stats_->Process();
- // Advance time to make the high report invalid, the lower rtt should now be
- // in the callback.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- const int64_t kAvgRtt3 = 95;
- EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt3, kLowRtt)).Times(1);
- call_stats_->Process();
+ // Here, enough time must have passed in order to remove exactly the first
+ // report and nothing else (>1500ms has passed since the first rtt).
+ // So, this value is arrived by doing:
+ // (kAvgRtt1)/1 * 0.7 + (kHighRtt+kLowRtt)/2 * 0.3 = 102.1
+ static constexpr const int64_t kAvgRtt2 = 102;
+ EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kHighRtt))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([this] {
+ // Advance time to make the high report invalid, the lower rtt should
+ // now be in the callback.
+ fake_clock_.AdvanceTimeMilliseconds(1000);
+ }));
- call_stats_->DeregisterStatsObserver(&stats_observer);
+ static constexpr const int64_t kAvgRtt3 = 95;
+ EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt3, kLowRtt))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([&event] { event.Set(); }));
+
+ // Trigger the first rtt value and set off the chain of callbacks.
+ rtcp_rtt_stats->OnRttUpdate(kFirstRtt); // Reported at T0 (0ms).
+ EXPECT_TRUE(event.Wait(1000));
+
+ call_stats_.DeregisterStatsObserver(&stats_observer);
}
TEST_F(CallStatsTest, LastProcessedRtt) {
+ rtc::Event event(false, false);
MockStatsObserver stats_observer;
- call_stats_->RegisterStatsObserver(&stats_observer);
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
- fake_clock_.AdvanceTimeMilliseconds(1000);
+ call_stats_.RegisterStatsObserver(&stats_observer);
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
+
+ static constexpr const int64_t kRttLow = 10;
+ static constexpr const int64_t kRttHigh = 30;
+ // The following two average numbers dependend on average + weight
+ // calculations in call_stats.cc.
+ static constexpr const int64_t kAvgRtt1 = 13;
+ static constexpr const int64_t kAvgRtt2 = 15;
+
+ EXPECT_CALL(stats_observer, OnRttUpdate(kRttLow, kRttLow))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats] {
+ EXPECT_EQ(kRttLow, rtcp_rtt_stats->LastProcessedRtt());
+ // Don't advance the clock to make sure that low and high rtt values
+ // are associated with the same time stamp.
+ rtcp_rtt_stats->OnRttUpdate(kRttHigh);
+ }));
+
+ EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt1, kRttHigh))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats, this] {
+ EXPECT_EQ(kAvgRtt1, rtcp_rtt_stats->LastProcessedRtt());
+ fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
+ rtcp_rtt_stats->OnRttUpdate(kRttLow);
+ rtcp_rtt_stats->OnRttUpdate(kRttHigh);
+ }));
+
+ EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt2, kRttHigh))
+ .Times(1)
+ .WillOnce(InvokeWithoutArgs([rtcp_rtt_stats, &event] {
+ EXPECT_EQ(kAvgRtt2, rtcp_rtt_stats->LastProcessedRtt());
+ event.Set();
+ }));
// Set a first values and verify that LastProcessedRtt initially returns the
// average rtt.
- const int64_t kRttLow = 10;
- const int64_t kRttHigh = 30;
- const int64_t kAvgRtt = 20;
+ fake_clock_.AdvanceTimeMilliseconds(CallStats::kUpdateIntervalMs);
rtcp_rtt_stats->OnRttUpdate(kRttLow);
- rtcp_rtt_stats->OnRttUpdate(kRttHigh);
- EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt, kRttHigh)).Times(1);
- call_stats_->Process();
- EXPECT_EQ(kAvgRtt, rtcp_rtt_stats->LastProcessedRtt());
+ EXPECT_TRUE(event.Wait(1000));
+ EXPECT_EQ(kAvgRtt2, rtcp_rtt_stats->LastProcessedRtt());
- // Update values and verify LastProcessedRtt.
- fake_clock_.AdvanceTimeMilliseconds(1000);
- rtcp_rtt_stats->OnRttUpdate(kRttLow);
- rtcp_rtt_stats->OnRttUpdate(kRttHigh);
- EXPECT_CALL(stats_observer, OnRttUpdate(kAvgRtt, kRttHigh)).Times(1);
- call_stats_->Process();
- EXPECT_EQ(kAvgRtt, rtcp_rtt_stats->LastProcessedRtt());
-
- call_stats_->DeregisterStatsObserver(&stats_observer);
+ call_stats_.DeregisterStatsObserver(&stats_observer);
}
TEST_F(CallStatsTest, ProducesHistogramMetrics) {
metrics::Reset();
- const int64_t kRtt = 123;
- RtcpRttStats* rtcp_rtt_stats = call_stats_->rtcp_rtt_stats();
+ rtc::Event event(false, false);
+ static constexpr const int64_t kRtt = 123;
+ RtcpRttStats* rtcp_rtt_stats = &call_stats_;
+ MockStatsObserver stats_observer;
+ call_stats_.RegisterStatsObserver(&stats_observer);
+ EXPECT_CALL(stats_observer, OnRttUpdate(kRtt, kRtt))
+ .Times(AnyNumber())
+ .WillOnce(InvokeWithoutArgs([&event] { event.Set(); }))
+ .WillRepeatedly(Return());
+
rtcp_rtt_stats->OnRttUpdate(kRtt);
- fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds * 1000);
+ fake_clock_.AdvanceTimeMilliseconds(metrics::kMinRunTimeInSeconds *
+ CallStats::kUpdateIntervalMs);
rtcp_rtt_stats->OnRttUpdate(kRtt);
- call_stats_->Process();
- call_stats_.reset();
+ EXPECT_TRUE(event.Wait(1000));
+
+ call_stats_.DeregisterStatsObserver(&stats_observer);
+
+ process_thread_->Stop();
+ call_stats_.UpdateHistogramsForTest();
EXPECT_EQ(1, metrics::NumSamples(
"WebRTC.Video.AverageRoundTripTimeInMilliseconds"));
diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc
index 4b9bdf6..a632167 100644
--- a/video/video_receive_stream.cc
+++ b/video/video_receive_stream.cc
@@ -101,7 +101,7 @@
video_receiver_(clock_, nullptr, this, timing_.get(), this, this),
stats_proxy_(&config_, clock_),
rtp_video_stream_receiver_(&transport_adapter_,
- call_stats_->rtcp_rtt_stats(),
+ call_stats,
packet_router,
&config_,
rtp_receive_statistics_.get(),
diff --git a/video/video_receive_stream_unittest.cc b/video/video_receive_stream_unittest.cc
index 97a447b..e76c3a9 100644
--- a/video/video_receive_stream_unittest.cc
+++ b/video/video_receive_stream_unittest.cc
@@ -67,10 +67,10 @@
class VideoReceiveStreamTest : public testing::Test {
public:
VideoReceiveStreamTest()
- : override_field_trials_(kNewJitterBufferFieldTrialEnabled),
+ : process_thread_(ProcessThread::Create("TestThread")),
+ override_field_trials_(kNewJitterBufferFieldTrialEnabled),
config_(&mock_transport_),
- call_stats_(Clock::GetRealTimeClock()),
- process_thread_(ProcessThread::Create("TestThread")) {}
+ call_stats_(Clock::GetRealTimeClock(), process_thread_.get()) {}
void SetUp() {
constexpr int kDefaultNumCpuCores = 2;
@@ -96,6 +96,7 @@
}
protected:
+ std::unique_ptr<ProcessThread> process_thread_;
webrtc::test::ScopedFieldTrials override_field_trials_;
VideoReceiveStream::Config config_;
CallStats call_stats_;
@@ -104,7 +105,6 @@
cricket::FakeVideoRenderer fake_renderer_;
MockTransport mock_transport_;
PacketRouter packet_router_;
- std::unique_ptr<ProcessThread> process_thread_;
RtpStreamReceiverController rtp_stream_receiver_controller_;
std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
};
diff --git a/video/video_send_stream.cc b/video/video_send_stream.cc
index 7ca8961..a32917a 100644
--- a/video/video_send_stream.cc
+++ b/video/video_send_stream.cc
@@ -721,7 +721,7 @@
&encoder_feedback_,
bandwidth_observer_,
transport,
- call_stats_->rtcp_rtt_stats(),
+ call_stats,
flexfec_sender_.get(),
stats_proxy_,
send_delay_stats,