Add GetSctpStats to PeerConnectionInternal, remove sctp_data_channels()

This removes code from DataChannelController that exposes
an internal vector of data channels and puts the onus of
returning stats for a data channel, on the data channel
object itself. This will come in handy as we make threading
changes to the data channel object.

Change-Id: Ie164cc5823cd5f9782fc5c9a63aa4c76b8229639
Bug: webrtc:11547, webrtc:11687
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177244
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31533}
diff --git a/pc/data_channel.cc b/pc/data_channel.cc
index e4f658c..2265510 100644
--- a/pc/data_channel.cc
+++ b/pc/data_channel.cc
@@ -315,7 +315,6 @@
   // thread. Bring buffer management etc to the network thread and keep the
   // operational state management on the signaling thread.
 
-  buffered_amount_ += buffer.size();
   if (state_ != kOpen) {
     return false;
   }
@@ -327,6 +326,8 @@
     return true;
   }
 
+  buffered_amount_ += buffer.size();
+
   // If the queue is non-empty, we're waiting for SignalReadyToSend,
   // so just add to the end of the queue and keep waiting.
   if (!queued_send_data_.Empty()) {
@@ -429,6 +430,14 @@
   CloseAbruptlyWithError(std::move(error));
 }
 
+DataChannel::Stats DataChannel::GetStats() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  Stats stats{internal_id_,        id(),         label(),
+              protocol(),          state(),      messages_sent(),
+              messages_received(), bytes_sent(), bytes_received()};
+  return stats;
+}
+
 // The remote peer request that this channel shall be closed.
 void DataChannel::RemotePeerRequestClose() {
   RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
diff --git a/pc/data_channel.h b/pc/data_channel.h
index e843250..c1e855d 100644
--- a/pc/data_channel.h
+++ b/pc/data_channel.h
@@ -113,6 +113,18 @@
 //    callback and transition to kClosed.
 class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
  public:
+  struct Stats {
+    int internal_id;
+    int id;
+    std::string label;
+    std::string protocol;
+    DataState state;
+    uint32_t messages_sent;
+    uint32_t messages_received;
+    uint64_t bytes_sent;
+    uint64_t bytes_received;
+  };
+
   static rtc::scoped_refptr<DataChannel> Create(
       DataChannelProviderInterface* provider,
       cricket::DataChannelType dct,
@@ -205,6 +217,8 @@
   // to kClosed.
   void OnTransportChannelClosed();
 
+  Stats GetStats() const;
+
   /*******************************************
    * The following methods are for RTP only. *
    *******************************************/
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 9891d50..a8a1491 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -174,6 +174,11 @@
 void DataChannelController::SetupDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
+
+  // There's a new data channel transport.  This needs to be signaled to the
+  // |sctp_data_channels_| so that they can reopen and reconnect.  This is
+  // necessary when bundling is applied.
+  NotifyDataChannelsOfTransportCreated();
 }
 
 void DataChannelController::TeardownDataChannelTransport_n() {
@@ -200,17 +205,21 @@
       // There's a new data channel transport.  This needs to be signaled to the
       // |sctp_data_channels_| so that they can reopen and reconnect.  This is
       // necessary when bundling is applied.
-      data_channel_transport_invoker_->AsyncInvoke<void>(
-          RTC_FROM_HERE, signaling_thread(), [this] {
-            RTC_DCHECK_RUN_ON(signaling_thread());
-            for (const auto& channel : sctp_data_channels_) {
-              channel->OnTransportChannelCreated();
-            }
-          });
+      NotifyDataChannelsOfTransportCreated();
     }
   }
 }
 
+std::vector<DataChannel::Stats> DataChannelController::GetDataChannelStats()
+    const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  std::vector<DataChannel::Stats> stats;
+  stats.reserve(sctp_data_channels_.size());
+  for (const auto& channel : sctp_data_channels_)
+    stats.push_back(channel->GetStats());
+  return stats;
+}
+
 bool DataChannelController::HandleOpenMessage_s(
     const cricket::ReceiveDataParams& params,
     const rtc::CopyOnWriteBuffer& buffer) {
@@ -463,12 +472,6 @@
   return &rtp_data_channels_;
 }
 
-const std::vector<rtc::scoped_refptr<DataChannel>>*
-DataChannelController::sctp_data_channels() const {
-  RTC_DCHECK_RUN_ON(signaling_thread());
-  return &sctp_data_channels_;
-}
-
 void DataChannelController::UpdateClosingRtpDataChannels(
     const std::vector<std::string>& active_channels,
     bool is_local_update) {
@@ -549,6 +552,17 @@
   return false;
 }
 
+void DataChannelController::NotifyDataChannelsOfTransportCreated() {
+  RTC_DCHECK_RUN_ON(network_thread());
+  data_channel_transport_invoker_->AsyncInvoke<void>(
+      RTC_FROM_HERE, signaling_thread(), [this] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        for (const auto& channel : sctp_data_channels_) {
+          channel->OnTransportChannelCreated();
+        }
+      });
+}
+
 rtc::Thread* DataChannelController::network_thread() const {
   return pc_->network_thread();
 }
diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h
index 156bbe5..c3e64ab 100644
--- a/pc/data_channel_controller.h
+++ b/pc/data_channel_controller.h
@@ -64,6 +64,9 @@
   void OnTransportChanged(
       DataChannelTransportInterface* data_channel_transport);
 
+  // Called from PeerConnection::GetDataChannelStats on the signaling thread.
+  std::vector<DataChannel::Stats> GetDataChannelStats() const;
+
   // Creates channel and adds it to the collection of DataChannels that will
   // be offered in a SessionDescription.
   rtc::scoped_refptr<DataChannel> InternalCreateDataChannel(
@@ -101,8 +104,6 @@
   void set_data_channel_transport(DataChannelTransportInterface* transport);
   const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
   rtp_data_channels() const;
-  const std::vector<rtc::scoped_refptr<DataChannel>>* sctp_data_channels()
-      const;
 
   sigslot::signal1<DataChannel*>& SignalDataChannelCreated() {
     RTC_DCHECK_RUN_ON(signaling_thread());
@@ -137,6 +138,10 @@
                            const rtc::CopyOnWriteBuffer& payload,
                            cricket::SendDataResult* result);
 
+  // Called when all data channels need to be notified of a transport channel
+  // (calls OnTransportChannelCreated on the signaling thread).
+  void NotifyDataChannelsOfTransportCreated();
+
   rtc::Thread* network_thread() const;
   rtc::Thread* signaling_thread() const;
 
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 76f87f2..9b3b760 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -6225,6 +6225,11 @@
   return ice_config;
 }
 
+std::vector<DataChannel::Stats> PeerConnection::GetDataChannelStats() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return data_channel_controller_.GetDataChannelStats();
+}
+
 absl::optional<std::string> PeerConnection::sctp_transport_name() const {
   RTC_DCHECK_RUN_ON(signaling_thread());
   if (sctp_mid_s_ && transport_controller_) {
@@ -6705,12 +6710,6 @@
       } else {
         return false;
       }
-
-      // All non-RTP data channels must initialize |sctp_data_channels_|.
-      for (const auto& channel :
-           *data_channel_controller_.sctp_data_channels()) {
-        channel->OnTransportChannelCreated();
-      }
       return true;
     case cricket::DCT_RTP:
     default:
diff --git a/pc/peer_connection.h b/pc/peer_connection.h
index 3bb962b..4425e1c 100644
--- a/pc/peer_connection.h
+++ b/pc/peer_connection.h
@@ -280,11 +280,7 @@
     return data_channel_controller_.rtp_data_channel();
   }
 
-  std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
-      const override {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    return *data_channel_controller_.sctp_data_channels();
-  }
+  std::vector<DataChannel::Stats> GetDataChannelStats() const override;
 
   absl::optional<std::string> sctp_transport_name() const override;
 
diff --git a/pc/peer_connection_internal.h b/pc/peer_connection_internal.h
index 52ffe85..66d585b 100644
--- a/pc/peer_connection_internal.h
+++ b/pc/peer_connection_internal.h
@@ -46,8 +46,11 @@
   // Only valid when using deprecated RTP data channels.
   virtual cricket::RtpDataChannel* rtp_data_channel() const = 0;
 
-  virtual std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
-      const = 0;
+  // Call on the network thread to fetch stats for all the data channels.
+  // TODO(tommi): Make pure virtual after downstream updates.
+  virtual std::vector<DataChannel::Stats> GetDataChannelStats() const {
+    return {};
+  }
 
   virtual absl::optional<std::string> sctp_transport_name() const = 0;
 
diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc
index 5d6792c..f66be30 100644
--- a/pc/rtc_stats_collector.cc
+++ b/pc/rtc_stats_collector.cc
@@ -1275,22 +1275,21 @@
 void RTCStatsCollector::ProduceDataChannelStats_s(
     int64_t timestamp_us,
     RTCStatsReport* report) const {
-  RTC_DCHECK(signaling_thread_->IsCurrent());
-  for (const rtc::scoped_refptr<DataChannel>& data_channel :
-       pc_->sctp_data_channels()) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  std::vector<DataChannel::Stats> data_stats = pc_->GetDataChannelStats();
+  for (const auto& stats : data_stats) {
     std::unique_ptr<RTCDataChannelStats> data_channel_stats(
         new RTCDataChannelStats(
-            "RTCDataChannel_" + rtc::ToString(data_channel->internal_id()),
+            "RTCDataChannel_" + rtc::ToString(stats.internal_id),
             timestamp_us));
-    data_channel_stats->label = data_channel->label();
-    data_channel_stats->protocol = data_channel->protocol();
-    data_channel_stats->data_channel_identifier = data_channel->id();
-    data_channel_stats->state =
-        DataStateToRTCDataChannelState(data_channel->state());
-    data_channel_stats->messages_sent = data_channel->messages_sent();
-    data_channel_stats->bytes_sent = data_channel->bytes_sent();
-    data_channel_stats->messages_received = data_channel->messages_received();
-    data_channel_stats->bytes_received = data_channel->bytes_received();
+    data_channel_stats->label = std::move(stats.label);
+    data_channel_stats->protocol = std::move(stats.protocol);
+    data_channel_stats->data_channel_identifier = stats.id;
+    data_channel_stats->state = DataStateToRTCDataChannelState(stats.state);
+    data_channel_stats->messages_sent = stats.messages_sent;
+    data_channel_stats->bytes_sent = stats.bytes_sent;
+    data_channel_stats->messages_received = stats.messages_received;
+    data_channel_stats->bytes_received = stats.bytes_received;
     report->AddStats(std::move(data_channel_stats));
   }
 }
diff --git a/pc/stats_collector.cc b/pc/stats_collector.cc
index 0509c6d..317e444 100644
--- a/pc/stats_collector.cc
+++ b/pc/stats_collector.cc
@@ -1146,19 +1146,20 @@
 
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
 
-  for (const auto& dc : pc_->sctp_data_channels()) {
+  std::vector<DataChannel::Stats> data_stats = pc_->GetDataChannelStats();
+  for (const auto& stats : data_stats) {
     StatsReport::Id id(StatsReport::NewTypedIntId(
-        StatsReport::kStatsReportTypeDataChannel, dc->id()));
+        StatsReport::kStatsReportTypeDataChannel, stats.id));
     StatsReport* report = reports_.ReplaceOrAddNew(id);
     report->set_timestamp(stats_gathering_started_);
-    report->AddString(StatsReport::kStatsValueNameLabel, dc->label());
+    report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
     // Filter out the initial id (-1).
-    if (dc->id() >= 0) {
-      report->AddInt(StatsReport::kStatsValueNameDataChannelId, dc->id());
+    if (stats.id >= 0) {
+      report->AddInt(StatsReport::kStatsValueNameDataChannelId, stats.id);
     }
-    report->AddString(StatsReport::kStatsValueNameProtocol, dc->protocol());
+    report->AddString(StatsReport::kStatsValueNameProtocol, stats.protocol);
     report->AddString(StatsReport::kStatsValueNameState,
-                      DataChannelInterface::DataStateString(dc->state()));
+                      DataChannelInterface::DataStateString(stats.state));
   }
 }
 
diff --git a/pc/test/fake_peer_connection_base.h b/pc/test/fake_peer_connection_base.h
index f4b27f0..e1663e6 100644
--- a/pc/test/fake_peer_connection_base.h
+++ b/pc/test/fake_peer_connection_base.h
@@ -254,11 +254,6 @@
 
   cricket::RtpDataChannel* rtp_data_channel() const override { return nullptr; }
 
-  std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
-      const override {
-    return {};
-  }
-
   absl::optional<std::string> sctp_transport_name() const override {
     return absl::nullopt;
   }
diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h
index f459552..175a1ed 100644
--- a/pc/test/fake_peer_connection_for_stats.h
+++ b/pc/test/fake_peer_connection_for_stats.h
@@ -259,9 +259,12 @@
     return transceivers_;
   }
 
-  std::vector<rtc::scoped_refptr<DataChannel>> sctp_data_channels()
-      const override {
-    return sctp_data_channels_;
+  std::vector<DataChannel::Stats> GetDataChannelStats() const override {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    std::vector<DataChannel::Stats> stats;
+    for (const auto& channel : sctp_data_channels_)
+      stats.push_back(channel->GetStats());
+    return stats;
   }
 
   cricket::CandidateStatsList GetPooledCandidateStats() const override {