Adds ChannelSend specific encoder task queue.

Before this change the encoder tasks runs on a shared worker queue.
That makes the destruction require synchronization to avoid races.
By keeping a separate encode queue to ChannelSend, we can safely
destruct the object without worrying for left over tasks, as they
will be stopped when the task queue is destroyed.

For TaskQueue implementations using one thread per TaskQueue this
will increase the thread count by the number of AudioSendStreams,
which typically is just one.

This is partly a reland of 9b9344742b186b14d87e827e71a1757f4c94b30e

Original change's description:
> Removes lock from ChannelSend.
>
> The lock isn't really needed as encoder_queue_is_active_ can be checked
> on the task queue to provide synchronization.
>
> There is one behavioral change due to this: We will not cancel any currently
> pending encoding tasks when we stop sending, they will be allowed to finish.
>
> Bug: webrtc:10365
> Change-Id: I2b4897dde8d49bc7ee5d2d69694616aee8aaea38
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125096
> Reviewed-by: Oskar Sundbom <ossu@webrtc.org>
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#26963}

Bug: webrtc:10365
Change-Id: Iafb84e25d90ec8639359be80fad65763d08e5719
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125740
Reviewed-by: Oskar Sundbom <ossu@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27038}
diff --git a/audio/channel_send.cc b/audio/channel_send.cc
index 19edf3c..9f95ebb 100644
--- a/audio/channel_send.cc
+++ b/audio/channel_send.cc
@@ -31,7 +31,6 @@
 #include "modules/pacing/packet_router.h"
 #include "modules/utility/include/process_thread.h"
 #include "rtc_base/checks.h"
-#include "rtc_base/critical_section.h"
 #include "rtc_base/event.h"
 #include "rtc_base/format_macros.h"
 #include "rtc_base/location.h"
@@ -88,7 +87,7 @@
   friend class VoERtcpObserver;
 
   ChannelSend(Clock* clock,
-              rtc::TaskQueue* encoder_queue,
+              TaskQueueFactory* task_queue_factory,
               ProcessThread* module_process_thread,
               MediaTransportInterface* media_transport,
               OverheadObserver* overhead_observer,
@@ -181,8 +180,6 @@
       rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) override;
 
  private:
-  class ProcessAndEncodeAudioTask;
-
   // From AudioPacketizationCallback in the ACM
   int32_t SendData(AudioFrameType frameType,
                    uint8_t payloadType,
@@ -200,20 +197,23 @@
                        uint8_t payloadType,
                        uint32_t timeStamp,
                        rtc::ArrayView<const uint8_t> payload,
-                       const RTPFragmentationHeader* fragmentation);
+                       const RTPFragmentationHeader* fragmentation)
+      RTC_RUN_ON(encoder_queue_);
 
   int32_t SendMediaTransportAudio(AudioFrameType frameType,
                                   uint8_t payloadType,
                                   uint32_t timeStamp,
                                   rtc::ArrayView<const uint8_t> payload,
-                                  const RTPFragmentationHeader* fragmentation);
+                                  const RTPFragmentationHeader* fragmentation)
+      RTC_RUN_ON(encoder_queue_);
 
   // Return media transport or nullptr if using RTP.
   MediaTransportInterface* media_transport() { return media_transport_; }
 
   // Called on the encoder task queue when a new input audio frame is ready
   // for encoding.
-  void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input);
+  void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input)
+      RTC_RUN_ON(encoder_queue_);
 
   void OnReceivedRtt(int64_t rtt_ms);
 
@@ -267,9 +267,7 @@
 
   const bool use_twcc_plr_for_ana_;
 
-  rtc::CriticalSection encoder_queue_lock_;
-  bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_lock_) = false;
-  rtc::TaskQueue* const encoder_queue_ = nullptr;
+  bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_) = false;
 
   MediaTransportInterface* const media_transport_;
   int media_transport_sequence_number_ RTC_GUARDED_BY(encoder_queue_) = 0;
@@ -286,12 +284,17 @@
       RTC_GUARDED_BY(&media_transport_lock_);
 
   // E2EE Audio Frame Encryption
-  rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_;
+  rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_
+      RTC_GUARDED_BY(encoder_queue_);
   // E2EE Frame Encryption Options
   const webrtc::CryptoOptions crypto_options_;
 
   rtc::CriticalSection bitrate_crit_section_;
   int configured_bitrate_bps_ RTC_GUARDED_BY(bitrate_crit_section_) = 0;
+
+  // Defined last to ensure that there are no running tasks when the other
+  // members are destroyed.
+  rtc::TaskQueue encoder_queue_;
 };
 
 const int kTelephoneEventAttenuationdB = 10;
@@ -473,32 +476,13 @@
   RtcpBandwidthObserver* bandwidth_observer_ RTC_GUARDED_BY(crit_);
 };
 
-class ChannelSend::ProcessAndEncodeAudioTask : public rtc::QueuedTask {
- public:
-  ProcessAndEncodeAudioTask(std::unique_ptr<AudioFrame> audio_frame,
-                            ChannelSend* channel)
-      : audio_frame_(std::move(audio_frame)), channel_(channel) {
-    RTC_DCHECK(channel_);
-  }
-
- private:
-  bool Run() override {
-    RTC_DCHECK_RUN_ON(channel_->encoder_queue_);
-    channel_->ProcessAndEncodeAudioOnTaskQueue(audio_frame_.get());
-    return true;
-  }
-
-  std::unique_ptr<AudioFrame> audio_frame_;
-  ChannelSend* const channel_;
-};
-
 int32_t ChannelSend::SendData(AudioFrameType frameType,
                               uint8_t payloadType,
                               uint32_t timeStamp,
                               const uint8_t* payloadData,
                               size_t payloadSize,
                               const RTPFragmentationHeader* fragmentation) {
-  RTC_DCHECK_RUN_ON(encoder_queue_);
+  RTC_DCHECK_RUN_ON(&encoder_queue_);
   rtc::ArrayView<const uint8_t> payload(payloadData, payloadSize);
 
   if (media_transport() != nullptr) {
@@ -521,7 +505,6 @@
                                   uint32_t timeStamp,
                                   rtc::ArrayView<const uint8_t> payload,
                                   const RTPFragmentationHeader* fragmentation) {
-  RTC_DCHECK_RUN_ON(encoder_queue_);
   if (_includeAudioLevelIndication) {
     // Store current audio level in the RTP sender.
     // The level will be used in combination with voice-activity state
@@ -594,7 +577,6 @@
     uint32_t timeStamp,
     rtc::ArrayView<const uint8_t> payload,
     const RTPFragmentationHeader* fragmentation) {
-  RTC_DCHECK_RUN_ON(encoder_queue_);
   // TODO(nisse): Use null _transportPtr for MediaTransport.
   // RTC_DCHECK(_transportPtr == nullptr);
   uint64_t channel_id;
@@ -645,7 +627,7 @@
 }
 
 ChannelSend::ChannelSend(Clock* clock,
-                         rtc::TaskQueue* encoder_queue,
+                         TaskQueueFactory* task_queue_factory,
                          ProcessThread* module_process_thread,
                          MediaTransportInterface* media_transport,
                          OverheadObserver* overhead_observer,
@@ -671,12 +653,13 @@
           new RateLimiter(clock, kMaxRetransmissionWindowMs)),
       use_twcc_plr_for_ana_(
           webrtc::field_trial::FindFullName("UseTwccPlrForAna") == "Enabled"),
-      encoder_queue_(encoder_queue),
       media_transport_(media_transport),
       frame_encryptor_(frame_encryptor),
-      crypto_options_(crypto_options) {
+      crypto_options_(crypto_options),
+      encoder_queue_(task_queue_factory->CreateTaskQueue(
+          "AudioEncoder",
+          TaskQueueFactory::Priority::NORMAL)) {
   RTC_DCHECK(module_process_thread);
-  RTC_DCHECK(encoder_queue);
   module_process_thread_checker_.DetachFromThread();
 
   audio_coding_.reset(AudioCodingModule::Create(AudioCodingModule::Config()));
@@ -763,11 +746,11 @@
   _rtpRtcpModule->SetSendingMediaStatus(true);
   int ret = _rtpRtcpModule->SetSendingStatus(true);
   RTC_DCHECK_EQ(0, ret);
-  {
-    // It is now OK to start posting tasks to the encoder task queue.
-    rtc::CritScope cs(&encoder_queue_lock_);
+  // It is now OK to start processing on the encoder task queue.
+  encoder_queue_.PostTask([this] {
+    RTC_DCHECK_RUN_ON(&encoder_queue_);
     encoder_queue_is_active_ = true;
-  }
+  });
 }
 
 void ChannelSend::StopSend() {
@@ -777,22 +760,12 @@
   }
   sending_ = false;
 
-  // Post a task to the encoder thread which sets an event when the task is
-  // executed. We know that no more encoding tasks will be added to the task
-  // queue for this channel since sending is now deactivated. It means that,
-  // if we wait for the event to bet set, we know that no more pending tasks
-  // exists and it is therfore guaranteed that the task queue will never try
-  // to acccess and invalid channel object.
-  RTC_DCHECK(encoder_queue_);
-
   rtc::Event flush;
-  {
-    // Clear |encoder_queue_is_active_| under lock to prevent any other tasks
-    // than this final "flush task" to be posted on the queue.
-    rtc::CritScope cs(&encoder_queue_lock_);
+  encoder_queue_.PostTask([this, &flush]() {
+    RTC_DCHECK_RUN_ON(&encoder_queue_);
     encoder_queue_is_active_ = false;
-    encoder_queue_->PostTask([&flush]() { flush.Set(); });
-  }
+    flush.Set();
+  });
   flush.Wait(rtc::Event::kForever);
 
   // Reset sending SSRC and sequence number and triggers direct transmission
@@ -1115,20 +1088,24 @@
 void ChannelSend::ProcessAndEncodeAudio(
     std::unique_ptr<AudioFrame> audio_frame) {
   RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_);
-  // Avoid posting any new tasks if sending was already stopped in StopSend().
-  rtc::CritScope cs(&encoder_queue_lock_);
-  if (!encoder_queue_is_active_) {
-    return;
-  }
+  struct ProcessAndEncodeAudio {
+    void operator()() {
+      RTC_DCHECK_RUN_ON(&channel->encoder_queue_);
+      if (!channel->encoder_queue_is_active_) {
+        return;
+      }
+      channel->ProcessAndEncodeAudioOnTaskQueue(audio_frame.get());
+    }
+    std::unique_ptr<AudioFrame> audio_frame;
+    ChannelSend* const channel;
+  };
   // Profile time between when the audio frame is added to the task queue and
   // when the task is actually executed.
   audio_frame->UpdateProfileTimeStamp();
-  encoder_queue_->PostTask(std::unique_ptr<rtc::QueuedTask>(
-      new ProcessAndEncodeAudioTask(std::move(audio_frame), this)));
+  encoder_queue_.PostTask(ProcessAndEncodeAudio{std::move(audio_frame), this});
 }
 
 void ChannelSend::ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) {
-  RTC_DCHECK_RUN_ON(encoder_queue_);
   RTC_DCHECK_GT(audio_input->samples_per_channel_, 0);
   RTC_DCHECK_LE(audio_input->num_channels_, 2);
 
@@ -1233,14 +1210,10 @@
 void ChannelSend::SetFrameEncryptor(
     rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) {
   RTC_DCHECK_RUN_ON(&worker_thread_checker_);
-  rtc::CritScope cs(&encoder_queue_lock_);
-  if (encoder_queue_is_active_) {
-    encoder_queue_->PostTask([this, frame_encryptor]() mutable {
-      this->frame_encryptor_ = std::move(frame_encryptor);
-    });
-  } else {
+  encoder_queue_.PostTask([this, frame_encryptor]() mutable {
+    RTC_DCHECK_RUN_ON(&encoder_queue_);
     frame_encryptor_ = std::move(frame_encryptor);
-  }
+  });
 }
 
 // TODO(sukhanov): Consider moving TargetTransferRate observer to
@@ -1261,7 +1234,7 @@
 
 std::unique_ptr<ChannelSendInterface> CreateChannelSend(
     Clock* clock,
-    rtc::TaskQueue* encoder_queue,
+    TaskQueueFactory* task_queue_factory,
     ProcessThread* module_process_thread,
     MediaTransportInterface* media_transport,
     OverheadObserver* overhead_observer,
@@ -1273,7 +1246,7 @@
     bool extmap_allow_mixed,
     int rtcp_report_interval_ms) {
   return absl::make_unique<ChannelSend>(
-      clock, encoder_queue, module_process_thread, media_transport,
+      clock, task_queue_factory, module_process_thread, media_transport,
       overhead_observer, rtp_transport, rtcp_rtt_stats, rtc_event_log,
       frame_encryptor, crypto_options, extmap_allow_mixed,
       rtcp_report_interval_ms);