Don't remove or retransmit packets in the pacer queue.
The main purpose right now of this CL is to avoid the situation
where multiple retransmissions are queued for sending (normally after
network glitch with increased pacer queue length), and some of those
fail sending because the can't be retrieved from the packet history
due to too short time since last sent.
Bug: webrtc:8975, webrtc:10607
Change-Id: I9f6369d83f0b8208e5f57b2dc2fd3f2db7c6fea1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/135164
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27884}
diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc
index b2dacc2..65c8c60 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history.cc
+++ b/modules/rtp_rtcp/source/rtp_packet_history.cc
@@ -118,6 +118,10 @@
stored_packet.send_time_ms = send_time_ms;
stored_packet.storage_type = type;
stored_packet.times_retransmitted = 0;
+ // No send time indicates packet is not sent immediately, but instead will
+ // be put in the pacer queue and later retrieved via
+ // GetPacketAndSetSendTime().
+ stored_packet.pending_transmission = !send_time_ms.has_value();
if (!start_seqno_) {
start_seqno_ = rtp_seq_no;
@@ -150,14 +154,17 @@
++packet.times_retransmitted;
}
- // Update send-time and return copy of packet instance.
+ // Update send-time and mark as no long in pacer queue.
packet.send_time_ms = now_ms;
+ packet.pending_transmission = false;
if (packet.storage_type == StorageType::kDontRetransmit) {
// Non retransmittable packet, so call must come from paced sender.
// Remove from history and return actual packet instance.
return RemovePacket(rtp_it);
}
+
+ // Return copy of packet instance since it may need to be retransmitted.
return absl::make_unique<RtpPacketToSend>(*packet.packet);
}
@@ -249,6 +256,21 @@
}
}
+bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) {
+ rtc::CritScope cs(&lock_);
+ if (mode_ == StorageMode::kDisabled) {
+ return false;
+ }
+
+ auto rtp_it = packet_history_.find(sequence_number);
+ if (rtp_it == packet_history_.end()) {
+ return false;
+ }
+
+ rtp_it->second.pending_transmission = true;
+ return true;
+}
+
void RtpPacketHistory::Reset() {
packet_history_.clear();
packet_size_.clear();
@@ -270,8 +292,8 @@
}
const StoredPacket& stored_packet = stored_packet_it->second;
- if (!stored_packet.send_time_ms) {
- // Don't remove packets that have not been sent.
+ if (stored_packet_it->second.pending_transmission) {
+ // Don't remove packets in the pacer queue, pending tranmission.
return;
}
@@ -341,6 +363,7 @@
state.ssrc = stored_packet.packet->Ssrc();
state.packet_size = stored_packet.packet->size();
state.times_retransmitted = stored_packet.times_retransmitted;
+ state.pending_transmission = stored_packet.pending_transmission;
return state;
}
diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h
index cf87ddf..811d97e 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history.h
+++ b/modules/rtp_rtcp/source/rtp_packet_history.h
@@ -47,6 +47,7 @@
size_t packet_size = 0;
// Number of times RE-transmitted, ie not including the first transmission.
size_t times_retransmitted = 0;
+ bool pending_transmission = false;
};
// Maximum number of packets we ever allow in the history.
@@ -92,6 +93,11 @@
// Cull packets that have been acknowledged as received by the remote end.
void CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers);
+ // Mark packet as queued for transmission. This will prevent premature
+ // removal or duplicate retransmissions in the pacer queue.
+ // Returns true if status was set, false if packet was not found.
+ bool SetPendingTransmission(uint16_t sequence_number);
+
private:
struct StoredPacket {
StoredPacket();
@@ -111,6 +117,9 @@
// The actual packet.
std::unique_ptr<RtpPacketToSend> packet;
+
+ // True if the packet is currently in the pacer queue pending transmission.
+ bool pending_transmission = false;
};
using StoredPacketIterator = std::map<uint16_t, StoredPacket>::iterator;
diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
index c10fbb6..5a251d9 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
@@ -636,4 +636,79 @@
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
}
+TEST_F(RtpPacketHistoryTest, SetsPendingTransmissionState) {
+ const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2;
+ hist_.SetRtt(kRttMs);
+
+ // Set size to remove old packets as soon as possible.
+ hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1);
+
+ // Add a packet, without send time, indicating it's in pacer queue.
+ hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission,
+ /* send_time_ms = */ absl::nullopt);
+
+ // Packet is pending transmission.
+ absl::optional<RtpPacketHistory::PacketState> packet_state =
+ hist_.GetPacketState(kStartSeqNum);
+ ASSERT_TRUE(packet_state.has_value());
+ EXPECT_TRUE(packet_state->pending_transmission);
+
+ // Packet sent, state should be back to non-pending.
+ EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
+ packet_state = hist_.GetPacketState(kStartSeqNum);
+ ASSERT_TRUE(packet_state.has_value());
+ EXPECT_FALSE(packet_state->pending_transmission);
+
+ // Time for a retransmission.
+ fake_clock_.AdvanceTimeMilliseconds(kRttMs);
+ EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum));
+ packet_state = hist_.GetPacketState(kStartSeqNum);
+ ASSERT_TRUE(packet_state.has_value());
+ EXPECT_TRUE(packet_state->pending_transmission);
+
+ // Packet sent.
+ EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
+ // Too early for retransmission.
+ ASSERT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value());
+
+ // Retransmission allowed again, it's not in a pending state.
+ fake_clock_.AdvanceTimeMilliseconds(kRttMs);
+ packet_state = hist_.GetPacketState(kStartSeqNum);
+ ASSERT_TRUE(packet_state.has_value());
+ EXPECT_FALSE(packet_state->pending_transmission);
+}
+
+TEST_F(RtpPacketHistoryTest, DontRemovePendingTransmissions) {
+ const int64_t kRttMs = RtpPacketHistory::kMinPacketDurationMs * 2;
+ const int64_t kPacketTimeoutMs =
+ kRttMs * RtpPacketHistory::kMinPacketDurationRtt;
+
+ // Set size to remove old packets as soon as possible.
+ hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 1);
+ hist_.SetRtt(kRttMs);
+
+ // Add a sent packet.
+ hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum), kAllowRetransmission,
+ fake_clock_.TimeInMilliseconds());
+
+ // Advance clock to just before packet timeout.
+ fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1);
+ // Mark as enqueued in pacer.
+ EXPECT_TRUE(hist_.SetPendingTransmission(kStartSeqNum));
+
+ // Advance clock to where packet would have timed out. It should still
+ // be there and pending.
+ fake_clock_.AdvanceTimeMilliseconds(1);
+ absl::optional<RtpPacketHistory::PacketState> packet_state =
+ hist_.GetPacketState(kStartSeqNum);
+ ASSERT_TRUE(packet_state.has_value());
+ EXPECT_TRUE(packet_state->pending_transmission);
+
+ // Packet sent. Now it can be removed.
+ EXPECT_TRUE(hist_.GetPacketAndSetSendTime(kStartSeqNum));
+ hist_.SetRtt(kRttMs); // Force culling of old packets.
+ packet_state = hist_.GetPacketState(kStartSeqNum);
+ ASSERT_FALSE(packet_state.has_value());
+}
+
} // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc
index 55095d7..b60114b 100644
--- a/modules/rtp_rtcp/source/rtp_sender.cc
+++ b/modules/rtp_rtcp/source/rtp_sender.cc
@@ -451,8 +451,8 @@
// don't retransmit too often.
absl::optional<RtpPacketHistory::PacketState> stored_packet =
packet_history_.GetPacketState(packet_id);
- if (!stored_packet) {
- // Packet not found.
+ if (!stored_packet || stored_packet->pending_transmission) {
+ // Packet not found or already queued for retransmission, ignore.
return 0;
}
@@ -468,6 +468,12 @@
}
if (paced_sender_) {
+ // Mark packet as being in pacer queue again, to prevent duplicates.
+ if (!packet_history_.SetPendingTransmission(packet_id)) {
+ // Packet has already been removed from history, return early.
+ return 0;
+ }
+
// Convert from TickTime to Clock since capture_time_ms is based on
// TickTime.
int64_t corrected_capture_tims_ms =
diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
index ba841c2..47f2856 100644
--- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
@@ -797,23 +797,28 @@
EXPECT_TRUE(rtp_sender_->SendToNetwork(std::move(packet),
kAllowRetransmission,
RtpPacketSender::kNormalPriority));
+ // Immediately process send bucket and send packet.
+ rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false,
+ PacedPacketInfo());
+ EXPECT_EQ(1, transport_.packets_sent());
- EXPECT_EQ(0, transport_.packets_sent());
-
- EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority,
- kSsrc, kSeqNum, _, _, _));
-
+ // Retransmit packet.
const int kStoredTimeInMs = 100;
fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs);
+ EXPECT_CALL(mock_paced_sender_, InsertPacket(RtpPacketSender::kNormalPriority,
+ kSsrc, kSeqNum, _, _, _));
+ EXPECT_CALL(mock_rtc_event_log_,
+ LogProxy(SameRtcEventTypeAs(RtcEvent::Type::RtpPacketOutgoing)));
+
EXPECT_EQ(static_cast<int>(packet_size), rtp_sender_->ReSendPacket(kSeqNum));
- EXPECT_EQ(0, transport_.packets_sent());
+ EXPECT_EQ(1, transport_.packets_sent());
rtp_sender_->TimeToSendPacket(kSsrc, kSeqNum, capture_time_ms, false,
PacedPacketInfo());
// Process send bucket. Packet should now be sent.
- EXPECT_EQ(1, transport_.packets_sent());
+ EXPECT_EQ(2, transport_.packets_sent());
EXPECT_EQ(packet_size, transport_.last_sent_packet().size());
webrtc::RTPHeader rtp_header;