Support more ssrcs in ReceiveStatistics than retrieved per RtcpReportBlocks call

Bug: webrtc:8239
Change-Id: Ie2d630e98384e640e0e7dcbfbb1f69453d873044
Reviewed-on: https://webrtc-review.googlesource.com/39784
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#21635}
diff --git a/modules/rtp_rtcp/source/receive_statistics_impl.cc b/modules/rtp_rtcp/source/receive_statistics_impl.cc
index 4f956a9..0e40aac 100644
--- a/modules/rtp_rtcp/source/receive_statistics_impl.cc
+++ b/modules/rtp_rtcp/source/receive_statistics_impl.cc
@@ -359,6 +359,7 @@
 
 ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
     : clock_(clock),
+      last_returned_ssrc_(0),
       rtcp_stats_callback_(NULL),
       rtp_stats_callback_(NULL) {}
 
@@ -467,29 +468,35 @@
   }
   std::vector<rtcp::ReportBlock> result;
   result.reserve(std::min(max_blocks, statisticians.size()));
-  for (auto& statistician : statisticians) {
-    // TODO(danilchap): Select statistician subset across multiple calls using
-    // round-robin, as described in rfc3550 section 6.4 when single
-    // rtcp_module/receive_statistics will be used for more rtp streams.
-    if (result.size() == max_blocks)
-      break;
-
+  auto add_report_block = [&result](uint32_t media_ssrc,
+                                    StreamStatisticianImpl* statistician) {
     // Do we have receive statistics to send?
     RtcpStatistics stats;
-    if (!statistician.second->GetActiveStatisticsAndReset(&stats))
-      continue;
+    if (!statistician->GetActiveStatisticsAndReset(&stats))
+      return;
     result.emplace_back();
     rtcp::ReportBlock& block = result.back();
-    block.SetMediaSsrc(statistician.first);
+    block.SetMediaSsrc(media_ssrc);
     block.SetFractionLost(stats.fraction_lost);
     if (!block.SetCumulativeLost(stats.packets_lost)) {
       RTC_LOG(LS_WARNING) << "Cumulative lost is oversized.";
       result.pop_back();
-      continue;
+      return;
     }
     block.SetExtHighestSeqNum(stats.extended_highest_sequence_number);
     block.SetJitter(stats.jitter);
-  }
+  };
+
+  const auto start_it = statisticians.upper_bound(last_returned_ssrc_);
+  for (auto it = start_it;
+       result.size() < max_blocks && it != statisticians.end(); ++it)
+    add_report_block(it->first, it->second);
+  for (auto it = statisticians.begin();
+       result.size() < max_blocks && it != start_it; ++it)
+    add_report_block(it->first, it->second);
+
+  if (!result.empty())
+    last_returned_ssrc_ = result.back().source_ssrc();
   return result;
 }
 
diff --git a/modules/rtp_rtcp/source/receive_statistics_impl.h b/modules/rtp_rtcp/source/receive_statistics_impl.h
index f0749a3..1f827f6 100644
--- a/modules/rtp_rtcp/source/receive_statistics_impl.h
+++ b/modules/rtp_rtcp/source/receive_statistics_impl.h
@@ -125,6 +125,7 @@
 
   Clock* const clock_;
   rtc::CriticalSection receive_statistics_lock_;
+  uint32_t last_returned_ssrc_;
   std::map<uint32_t, StreamStatisticianImpl*> statisticians_;
 
   RtcpStatisticsCallback* rtcp_stats_callback_;
diff --git a/modules/rtp_rtcp/source/receive_statistics_unittest.cc b/modules/rtp_rtcp/source/receive_statistics_unittest.cc
index 034be7d..001ae2e 100644
--- a/modules/rtp_rtcp/source/receive_statistics_unittest.cc
+++ b/modules/rtp_rtcp/source/receive_statistics_unittest.cc
@@ -9,6 +9,7 @@
  */
 
 #include <memory>
+#include <vector>
 
 #include "modules/rtp_rtcp/include/receive_statistics.h"
 #include "system_wrappers/include/clock.h"
@@ -16,23 +17,33 @@
 #include "test/gtest.h"
 
 namespace webrtc {
+namespace {
+
+using ::testing::SizeIs;
+using ::testing::UnorderedElementsAre;
 
 const size_t kPacketSize1 = 100;
 const size_t kPacketSize2 = 300;
-const uint32_t kSsrc1 = 1;
-const uint32_t kSsrc2 = 2;
+const uint32_t kSsrc1 = 101;
+const uint32_t kSsrc2 = 202;
+const uint32_t kSsrc3 = 203;
+const uint32_t kSsrc4 = 304;
+
+RTPHeader CreateRtpHeader(uint32_t ssrc) {
+  RTPHeader header;
+  memset(&header, 0, sizeof(header));
+  header.ssrc = ssrc;
+  header.sequenceNumber = 100;
+  return header;
+}
 
 class ReceiveStatisticsTest : public ::testing::Test {
  public:
   ReceiveStatisticsTest() :
       clock_(0),
       receive_statistics_(ReceiveStatistics::Create(&clock_)) {
-    memset(&header1_, 0, sizeof(header1_));
-    header1_.ssrc = kSsrc1;
-    header1_.sequenceNumber = 100;
-    memset(&header2_, 0, sizeof(header2_));
-    header2_.ssrc = kSsrc2;
-    header2_.sequenceNumber = 100;
+    header1_ = CreateRtpHeader(kSsrc1);
+    header2_ = CreateRtpHeader(kSsrc2);
   }
 
  protected:
@@ -89,6 +100,47 @@
   EXPECT_EQ(3u, packets_received);
 }
 
+TEST_F(ReceiveStatisticsTest,
+       RtcpReportBlocksReturnsMaxBlocksWhenThereAreMoreStatisticians) {
+  RTPHeader header1 = CreateRtpHeader(kSsrc1);
+  RTPHeader header2 = CreateRtpHeader(kSsrc2);
+  RTPHeader header3 = CreateRtpHeader(kSsrc3);
+  receive_statistics_->IncomingPacket(header1, kPacketSize1, false);
+  receive_statistics_->IncomingPacket(header2, kPacketSize1, false);
+  receive_statistics_->IncomingPacket(header3, kPacketSize1, false);
+
+  EXPECT_THAT(receive_statistics_->RtcpReportBlocks(2), SizeIs(2));
+  EXPECT_THAT(receive_statistics_->RtcpReportBlocks(2), SizeIs(2));
+  EXPECT_THAT(receive_statistics_->RtcpReportBlocks(2), SizeIs(2));
+}
+
+TEST_F(ReceiveStatisticsTest,
+       RtcpReportBlocksReturnsAllObservedSsrcsWithMultipleCalls) {
+  RTPHeader header1 = CreateRtpHeader(kSsrc1);
+  RTPHeader header2 = CreateRtpHeader(kSsrc2);
+  RTPHeader header3 = CreateRtpHeader(kSsrc3);
+  RTPHeader header4 = CreateRtpHeader(kSsrc4);
+  receive_statistics_->IncomingPacket(header1, kPacketSize1, false);
+  receive_statistics_->IncomingPacket(header2, kPacketSize1, false);
+  receive_statistics_->IncomingPacket(header3, kPacketSize1, false);
+  receive_statistics_->IncomingPacket(header4, kPacketSize1, false);
+
+  std::vector<uint32_t> observed_ssrcs;
+  std::vector<rtcp::ReportBlock> report_blocks =
+      receive_statistics_->RtcpReportBlocks(2);
+  ASSERT_THAT(report_blocks, SizeIs(2));
+  observed_ssrcs.push_back(report_blocks[0].source_ssrc());
+  observed_ssrcs.push_back(report_blocks[1].source_ssrc());
+
+  report_blocks = receive_statistics_->RtcpReportBlocks(2);
+  ASSERT_THAT(report_blocks, SizeIs(2));
+  observed_ssrcs.push_back(report_blocks[0].source_ssrc());
+  observed_ssrcs.push_back(report_blocks[1].source_ssrc());
+
+  EXPECT_THAT(observed_ssrcs,
+              UnorderedElementsAre(kSsrc1, kSsrc2, kSsrc3, kSsrc4));
+}
+
 TEST_F(ReceiveStatisticsTest, ActiveStatisticians) {
   receive_statistics_->IncomingPacket(header1_, kPacketSize1, false);
   ++header1_.sequenceNumber;
@@ -367,4 +419,6 @@
   expected.fec.packets = 1;
   callback.Matches(2, kSsrc1, expected);
 }
+
+}  // namespace
 }  // namespace webrtc