Don't boost pacing rate after pause.

The pacer has a mechanism to make sure all packets are sent within some
time limit. This is based on the average queue time of the packets in
the pacer queue.

If the pacer is paused while packets are still in the queue (for
instance if the underlying transport goes down temporarily), on resume
all those packets might be past the time limit and thus will all be
burst out onto the network in a tight loop.

This CL subtracts pause time from the queue time, effectively pausing
the clock for the queue while the pacer is paused, so that when we
resume the pacing bitrate will be the same as when we paused.

BUG=webrtc:7694

Review-Url: https://codereview.webrtc.org/2994323002
Cr-Commit-Position: refs/heads/master@{#19367}
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
index bf1c6ec..2442bf0 100644
--- a/webrtc/modules/pacing/paced_sender.cc
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -55,6 +55,7 @@
         sequence_number(seq_number),
         capture_time_ms(capture_time_ms),
         enqueue_time_ms(enqueue_time_ms),
+        sum_paused_ms(0),
         bytes(length_in_bytes),
         retransmission(retransmission),
         enqueue_order(enqueue_order) {}
@@ -62,8 +63,9 @@
   RtpPacketSender::Priority priority;
   uint32_t ssrc;
   uint16_t sequence_number;
-  int64_t capture_time_ms;
-  int64_t enqueue_time_ms;
+  int64_t capture_time_ms;  // Absolute time of frame capture.
+  int64_t enqueue_time_ms;  // Absolute time of pacer queue entry.
+  int64_t sum_paused_ms;    // Sum of time spent in queue while pacer is paused.
   size_t bytes;
   bool retransmission;
   uint64_t enqueue_order;
@@ -96,7 +98,8 @@
       : bytes_(0),
         clock_(clock),
         queue_time_sum_(0),
-        time_last_updated_(clock_->TimeInMilliseconds()) {}
+        time_last_updated_(clock_->TimeInMilliseconds()),
+        paused_(false) {}
   virtual ~PacketQueue() {}
 
   void Push(const Packet& packet) {
@@ -126,7 +129,11 @@
   void FinalizePop(const Packet& packet) {
     RemoveFromDupeSet(packet);
     bytes_ -= packet.bytes;
-    queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
+    int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
+    RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
+    packet_queue_time_ms -= packet.sum_paused_ms;
+    RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
+    queue_time_sum_ -= packet_queue_time_ms;
     packet_list_.erase(packet.this_it);
     RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
     if (packet_list_.empty())
@@ -148,14 +155,34 @@
 
   void UpdateQueueTime(int64_t timestamp_ms) {
     RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
-    int64_t delta = timestamp_ms - time_last_updated_;
-    // Use packet packet_list_.size() not prio_queue_.size() here, as there
-    // might be an outstanding element popped from prio_queue_ currently in the
-    // SendPacket() call, while packet_list_ will always be correct.
-    queue_time_sum_ += delta * packet_list_.size();
+    if (timestamp_ms == time_last_updated_)
+      return;
+
+    int64_t delta_ms = timestamp_ms - time_last_updated_;
+
+    if (paused_) {
+      // Increase per-packet accumulators of time spent in queue while paused,
+      // so that we can disregard that when subtracting main accumulator when
+      // popping packet from the queue.
+      for (auto& it : packet_list_) {
+        it.sum_paused_ms += delta_ms;
+      }
+    } else {
+      // Use packet packet_list_.size() not prio_queue_.size() here, as there
+      // might be an outstanding element popped from prio_queue_ currently in
+      // the SendPacket() call, while packet_list_ will always be correct.
+      queue_time_sum_ += delta_ms * packet_list_.size();
+    }
     time_last_updated_ = timestamp_ms;
   }
 
+  void SetPauseState(bool paused, int64_t timestamp_ms) {
+    if (paused_ == paused)
+      return;
+    UpdateQueueTime(timestamp_ms);
+    paused_ = paused;
+  }
+
   int64_t AverageQueueTimeMs() const {
     if (prio_queue_.empty())
       return 0;
@@ -200,6 +227,7 @@
   const Clock* const clock_;
   int64_t queue_time_sum_;
   int64_t time_last_updated_;
+  bool paused_;
 };
 
 }  // namespace paced_sender
@@ -243,6 +271,7 @@
   {
     rtc::CritScope cs(&critsect_);
     paused_ = true;
+    packets_->SetPauseState(true, clock_->TimeInMilliseconds());
   }
   // Tell the process thread to call our TimeUntilNextProcess() method to get
   // a new (longer) estimate for when to call Process().
@@ -255,6 +284,7 @@
   {
     rtc::CritScope cs(&critsect_);
     paused_ = false;
+    packets_->SetPauseState(false, clock_->TimeInMilliseconds());
   }
   // Tell the process thread to call our TimeUntilNextProcess() method to
   // refresh the estimate for when to call Process().
diff --git a/webrtc/modules/pacing/paced_sender.h b/webrtc/modules/pacing/paced_sender.h
index 1d38d09..a1f7ebe 100644
--- a/webrtc/modules/pacing/paced_sender.h
+++ b/webrtc/modules/pacing/paced_sender.h
@@ -136,7 +136,8 @@
   virtual rtc::Optional<int64_t> GetApplicationLimitedRegionStartTime() const;
 
   // Returns the average time since being enqueued, in milliseconds, for all
-  // packets currently in the pacer queue, or 0 if queue is empty.
+  // packets currently in the pacer queue, excluding any time the pacer has been
+  // paused. Returns 0 if queue is empty.
   virtual int64_t AverageQueueTimeMs();
 
   // Returns the number of milliseconds until the module want a worker thread
diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc
index 5b81462..0b2ac1c 100644
--- a/webrtc/modules/pacing/paced_sender_unittest.cc
+++ b/webrtc/modules/pacing/paced_sender_unittest.cc
@@ -1094,5 +1094,69 @@
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
 }
 
+TEST_F(PacedSenderTest, QueueTimeWithPause) {
+  const size_t kPacketSize = 1200;
+  const uint32_t kSsrc = 12346;
+  uint16_t sequence_number = 1234;
+
+  send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
+      sequence_number++, clock_.TimeInMilliseconds(),
+      kPacketSize, false);
+  send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
+      sequence_number++, clock_.TimeInMilliseconds(),
+      kPacketSize, false);
+
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
+
+  send_bucket_->Pause();
+  EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
+
+  // In paused state, queue time should not increase.
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
+
+  send_bucket_->Resume();
+  EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
+
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(200, send_bucket_->AverageQueueTimeMs());
+}
+
+TEST_F(PacedSenderTest, QueueTimePausedDuringPush) {
+  const size_t kPacketSize = 1200;
+  const uint32_t kSsrc = 12346;
+  uint16_t sequence_number = 1234;
+
+  send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
+      sequence_number++, clock_.TimeInMilliseconds(),
+      kPacketSize, false);
+  clock_.AdvanceTimeMilliseconds(100);
+  send_bucket_->Pause();
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs());
+
+  // Add a new packet during paused phase.
+  send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc,
+      sequence_number++, clock_.TimeInMilliseconds(),
+      kPacketSize, false);
+  // From a queue time perspective, packet inserted during pause will have zero
+  // queue time. Average queue time will then be (0 + 100) / 2 = 50.
+  EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
+
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
+
+  send_bucket_->Resume();
+  EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs());
+
+  clock_.AdvanceTimeMilliseconds(100);
+  EXPECT_EQ(150, send_bucket_->AverageQueueTimeMs());
+}
+
+// TODO(sprang): Extract PacketQueue from PacedSender so that we can test
+// removing elements while paused. (This is possible, but only because of semi-
+// racy condition so can't easily be tested).
+
 }  // namespace test
 }  // namespace webrtc