Revert "Removes lock from ChannelSend."
This reverts commit 9b9344742b186b14d87e827e71a1757f4c94b30e.
Reason for revert: Caused test flakiness.
Original change's description:
> Removes lock from ChannelSend.
>
> The lock isn't really needed as encoder_queue_is_active_ can be checked
> on the task queue to provide synchronization.
>
> There is one behavioral change due to this: We will not cancel any currently
> pending encoding tasks when we stop sending, they will be allowed to finish.
>
> Bug: webrtc:10365
> Change-Id: I2b4897dde8d49bc7ee5d2d69694616aee8aaea38
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125096
> Reviewed-by: Oskar Sundbom <ossu@webrtc.org>
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#26963}
TBR=ossu@webrtc.org,srte@webrtc.org
Change-Id: I30409414d3dc7b0be75b14a70dfc4457f5682a8c
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:10365
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125726
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26971}
diff --git a/audio/channel_send.cc b/audio/channel_send.cc
index 813795b..0c8be1f 100644
--- a/audio/channel_send.cc
+++ b/audio/channel_send.cc
@@ -31,6 +31,7 @@
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
+#include "rtc_base/critical_section.h"
#include "rtc_base/event.h"
#include "rtc_base/format_macros.h"
#include "rtc_base/location.h"
@@ -178,6 +179,8 @@
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) override;
private:
+ class ProcessAndEncodeAudioTask;
+
// From AudioPacketizationCallback in the ACM
int32_t SendData(FrameType frameType,
uint8_t payloadType,
@@ -261,7 +264,8 @@
const bool use_twcc_plr_for_ana_;
- bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_) = false;
+ rtc::CriticalSection encoder_queue_lock_;
+ bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_lock_) = false;
rtc::TaskQueue* const encoder_queue_ = nullptr;
MediaTransportInterface* const media_transport_;
@@ -466,6 +470,25 @@
RtcpBandwidthObserver* bandwidth_observer_ RTC_GUARDED_BY(crit_);
};
+class ChannelSend::ProcessAndEncodeAudioTask : public rtc::QueuedTask {
+ public:
+ ProcessAndEncodeAudioTask(std::unique_ptr<AudioFrame> audio_frame,
+ ChannelSend* channel)
+ : audio_frame_(std::move(audio_frame)), channel_(channel) {
+ RTC_DCHECK(channel_);
+ }
+
+ private:
+ bool Run() override {
+ RTC_DCHECK_RUN_ON(channel_->encoder_queue_);
+ channel_->ProcessAndEncodeAudioOnTaskQueue(audio_frame_.get());
+ return true;
+ }
+
+ std::unique_ptr<AudioFrame> audio_frame_;
+ ChannelSend* const channel_;
+};
+
int32_t ChannelSend::SendData(FrameType frameType,
uint8_t payloadType,
uint32_t timeStamp,
@@ -726,11 +749,11 @@
_rtpRtcpModule->SetSendingMediaStatus(true);
int ret = _rtpRtcpModule->SetSendingStatus(true);
RTC_DCHECK_EQ(0, ret);
- // It is now OK to start processing on the encoder task queue.
- encoder_queue_->PostTask([this] {
- RTC_DCHECK_RUN_ON(encoder_queue_);
+ {
+ // It is now OK to start posting tasks to the encoder task queue.
+ rtc::CritScope cs(&encoder_queue_lock_);
encoder_queue_is_active_ = true;
- });
+ }
}
void ChannelSend::StopSend() {
@@ -749,11 +772,13 @@
RTC_DCHECK(encoder_queue_);
rtc::Event flush;
- encoder_queue_->PostTask([this, &flush]() {
- RTC_DCHECK_RUN_ON(encoder_queue_);
+ {
+ // Clear |encoder_queue_is_active_| under lock to prevent any other tasks
+ // than this final "flush task" to be posted on the queue.
+ rtc::CritScope cs(&encoder_queue_lock_);
encoder_queue_is_active_ = false;
- flush.Set();
- });
+ encoder_queue_->PostTask([&flush]() { flush.Set(); });
+ }
flush.Wait(rtc::Event::kForever);
// Reset sending SSRC and sequence number and triggers direct transmission
@@ -1072,21 +1097,16 @@
void ChannelSend::ProcessAndEncodeAudio(
std::unique_ptr<AudioFrame> audio_frame) {
RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_);
- struct ProcessAndEncodeAudio {
- void operator()() {
- RTC_DCHECK_RUN_ON(channel->encoder_queue_);
- if (!channel->encoder_queue_is_active_) {
- return;
- }
- channel->ProcessAndEncodeAudioOnTaskQueue(audio_frame.get());
- }
- std::unique_ptr<AudioFrame> audio_frame;
- ChannelSend* const channel;
- };
+ // Avoid posting any new tasks if sending was already stopped in StopSend().
+ rtc::CritScope cs(&encoder_queue_lock_);
+ if (!encoder_queue_is_active_) {
+ return;
+ }
// Profile time between when the audio frame is added to the task queue and
// when the task is actually executed.
audio_frame->UpdateProfileTimeStamp();
- encoder_queue_->PostTask(ProcessAndEncodeAudio{std::move(audio_frame), this});
+ encoder_queue_->PostTask(std::unique_ptr<rtc::QueuedTask>(
+ new ProcessAndEncodeAudioTask(std::move(audio_frame), this)));
}
void ChannelSend::ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) {
@@ -1195,12 +1215,13 @@
void ChannelSend::SetFrameEncryptor(
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
- if (sending_) {
+ rtc::CritScope cs(&encoder_queue_lock_);
+ if (encoder_queue_is_active_) {
encoder_queue_->PostTask([this, frame_encryptor]() mutable {
this->frame_encryptor_ = std::move(frame_encryptor);
});
} else {
- this->frame_encryptor_ = std::move(frame_encryptor);
+ frame_encryptor_ = std::move(frame_encryptor);
}
}