Improve performance of RtpPacketHistory
The data structures in RtpPacketHistory were chosen based on assumption
of few packets with possible sparse segments due to missing acking.
In practice high bitrate usages with full histories seem to be more of
a problem.
Due to that, change storage from an std::map to an std::deque and live
with potential segments of nullptr. Also limit size of padding prio
set so that doesn't become a bottleneck.
Bug: webrtc:8975
Change-Id: I3b6314fb3255937d25362ff2cd906efb7b1397f7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145901
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29117}
diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc
index 85689f9..b369f40 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history.cc
+++ b/modules/rtp_rtcp/source/rtp_packet_history.cc
@@ -23,6 +23,7 @@
namespace webrtc {
constexpr size_t RtpPacketHistory::kMaxCapacity;
+constexpr size_t RtpPacketHistory::kMaxPaddingtHistory;
constexpr int64_t RtpPacketHistory::kMinPacketDurationMs;
constexpr int RtpPacketHistory::kMinPacketDurationRtt;
constexpr int RtpPacketHistory::kPacketCullingDelayFactor;
@@ -130,18 +131,28 @@
// Store packet.
const uint16_t rtp_seq_no = packet->SequenceNumber();
- auto packet_it = packet_history_.emplace(
- rtp_seq_no,
- StoredPacket(std::move(packet), send_time_ms, packets_inserted_++));
- RTC_DCHECK(packet_it.second) << "Failed to insert packet in history.";
- StoredPacket& stored_packet = packet_it.first->second;
+ int packet_index = GetPacketIndex(rtp_seq_no);
+ RTC_DCHECK_GE(packet_index, 0) << "Out-of-order inserts not supported.";
+ size_t index = packet_index;
- if (!start_seqno_) {
- start_seqno_ = rtp_seq_no;
+ while (packet_history_.size() < index) {
+ packet_history_.emplace_back(nullptr, absl::nullopt, 0);
+ }
+ RTC_DCHECK(packet_history_.size() == index ||
+ packet_history_[index].packet_ == nullptr);
+
+ if (packet_history_.size() <= index) {
+ packet_history_.emplace_back(std::move(packet), send_time_ms,
+ packets_inserted_++);
+ } else {
+ packet_history_[packet_index] =
+ StoredPacket(std::move(packet), send_time_ms, packets_inserted_++);
}
- // Store the sequence number of the last send packet with this size.
- auto prio_it = padding_priority_.insert(&stored_packet);
+ if (padding_priority_.size() >= kMaxPaddingtHistory - 1) {
+ padding_priority_.erase(std::prev(padding_priority_.end()));
+ }
+ auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
}
@@ -152,27 +163,26 @@
return nullptr;
}
+ StoredPacket* packet = GetStoredPacket(sequence_number);
+ if (packet == nullptr) {
+ return nullptr;
+ }
+
int64_t now_ms = clock_->TimeInMilliseconds();
- StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
- if (rtp_it == packet_history_.end()) {
+ if (!VerifyRtt(*packet, now_ms)) {
return nullptr;
}
- StoredPacket& packet = rtp_it->second;
- if (!VerifyRtt(rtp_it->second, now_ms)) {
- return nullptr;
- }
-
- if (packet.send_time_ms_) {
- packet.IncrementTimesRetransmitted(&padding_priority_);
+ if (packet->send_time_ms_) {
+ packet->IncrementTimesRetransmitted(&padding_priority_);
}
// Update send-time and mark as no long in pacer queue.
- packet.send_time_ms_ = now_ms;
- packet.pending_transmission_ = false;
+ packet->send_time_ms_ = now_ms;
+ packet->pending_transmission_ = false;
- // Return copy of packet instance since it may need to be retransmitted again.
- return absl::make_unique<RtpPacketToSend>(*packet.packet_);
+ // Return copy of packet instance since it may need to be retransmitted.
+ return absl::make_unique<RtpPacketToSend>(*packet->packet_);
}
std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
@@ -192,29 +202,26 @@
return nullptr;
}
- int64_t now_ms = clock_->TimeInMilliseconds();
- StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
- if (rtp_it == packet_history_.end()) {
+ StoredPacket* packet = GetStoredPacket(sequence_number);
+ if (packet == nullptr) {
return nullptr;
}
- StoredPacket& packet = rtp_it->second;
-
- if (packet.pending_transmission_) {
+ if (packet->pending_transmission_) {
// Packet already in pacer queue, ignore this request.
return nullptr;
}
- if (!VerifyRtt(rtp_it->second, now_ms)) {
+ if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) {
// Packet already resent within too short a time window, ignore.
return nullptr;
}
// Copy and/or encapsulate packet.
std::unique_ptr<RtpPacketToSend> encapsulated_packet =
- encapsulate(*packet.packet_);
+ encapsulate(*packet->packet_);
if (encapsulated_packet) {
- packet.pending_transmission_ = true;
+ packet->pending_transmission_ = true;
}
return encapsulated_packet;
@@ -226,20 +233,18 @@
return;
}
- int64_t now_ms = clock_->TimeInMilliseconds();
- StoredPacketIterator rtp_it = packet_history_.find(sequence_number);
- if (rtp_it == packet_history_.end()) {
+ StoredPacket* packet = GetStoredPacket(sequence_number);
+ if (packet == nullptr) {
return;
}
- StoredPacket& packet = rtp_it->second;
- RTC_DCHECK(packet.send_time_ms_);
+ RTC_DCHECK(packet->send_time_ms_);
// Update send-time, mark as no longer in pacer queue, and increment
// transmission count.
- packet.send_time_ms_ = now_ms;
- packet.pending_transmission_ = false;
- packet.IncrementTimesRetransmitted(&padding_priority_);
+ packet->send_time_ms_ = clock_->TimeInMilliseconds();
+ packet->pending_transmission_ = false;
+ packet->IncrementTimesRetransmitted(&padding_priority_);
}
absl::optional<RtpPacketHistory::PacketState> RtpPacketHistory::GetPacketState(
@@ -249,16 +254,21 @@
return absl::nullopt;
}
- auto rtp_it = packet_history_.find(sequence_number);
- if (rtp_it == packet_history_.end()) {
+ int packet_index = GetPacketIndex(sequence_number);
+ if (packet_index < 0 ||
+ static_cast<size_t>(packet_index) >= packet_history_.size()) {
+ return absl::nullopt;
+ }
+ const StoredPacket& packet = packet_history_[packet_index];
+ if (packet.packet_ == nullptr) {
return absl::nullopt;
}
- if (!VerifyRtt(rtp_it->second, clock_->TimeInMilliseconds())) {
+ if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) {
return absl::nullopt;
}
- return StoredPacketToPacketState(rtp_it->second);
+ return StoredPacketToPacketState(packet);
}
bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet,
@@ -317,15 +327,13 @@
void RtpPacketHistory::CullAcknowledgedPackets(
rtc::ArrayView<const uint16_t> sequence_numbers) {
rtc::CritScope cs(&lock_);
- if (mode_ == StorageMode::kDisabled) {
- return;
- }
-
for (uint16_t sequence_number : sequence_numbers) {
- auto stored_packet_it = packet_history_.find(sequence_number);
- if (stored_packet_it != packet_history_.end()) {
- RemovePacket(stored_packet_it);
+ int packet_index = GetPacketIndex(sequence_number);
+ if (packet_index < 0 ||
+ static_cast<size_t>(packet_index) >= packet_history_.size()) {
+ continue;
}
+ RemovePacket(packet_index);
}
}
@@ -335,12 +343,12 @@
return false;
}
- auto rtp_it = packet_history_.find(sequence_number);
- if (rtp_it == packet_history_.end()) {
+ StoredPacket* packet = GetStoredPacket(sequence_number);
+ if (packet == nullptr) {
return false;
}
- rtp_it->second.pending_transmission_ = true;
+ packet->pending_transmission_ = true;
return true;
}
@@ -352,25 +360,21 @@
void RtpPacketHistory::Reset() {
packet_history_.clear();
padding_priority_.clear();
- start_seqno_.reset();
}
void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
int64_t packet_duration_ms =
std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs);
while (!packet_history_.empty()) {
- auto stored_packet_it = packet_history_.find(*start_seqno_);
- RTC_DCHECK(stored_packet_it != packet_history_.end());
-
if (packet_history_.size() >= kMaxCapacity) {
// We have reached the absolute max capacity, remove one packet
// unconditionally.
- RemovePacket(stored_packet_it);
+ RemovePacket(0);
continue;
}
- const StoredPacket& stored_packet = stored_packet_it->second;
- if (stored_packet_it->second.pending_transmission_) {
+ const StoredPacket& stored_packet = packet_history_.front();
+ if (stored_packet.pending_transmission_) {
// Don't remove packets in the pacer queue, pending tranmission.
return;
}
@@ -386,7 +390,7 @@
now_ms) {
// Too many packets in history, or this packet has timed out. Remove it
// and continue.
- RemovePacket(stored_packet_it);
+ RemovePacket(0);
} else {
// No more packets can be removed right now.
return;
@@ -395,46 +399,57 @@
}
std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
- StoredPacketIterator packet_it) {
+ int packet_index) {
// Move the packet out from the StoredPacket container.
std::unique_ptr<RtpPacketToSend> rtp_packet =
- std::move(packet_it->second.packet_);
-
- // Check if this is the oldest packet in the history, as this must be updated
- // in order to cull old packets.
- const bool is_first_packet = packet_it->first == start_seqno_;
+ std::move(packet_history_[packet_index].packet_);
// Erase from padding priority set, if eligible.
- size_t num_erased = padding_priority_.erase(&packet_it->second);
- RTC_DCHECK_EQ(num_erased, 1)
- << "Failed to remove one packet from prio set, got " << num_erased;
- if (num_erased != 1) {
- RTC_LOG(LS_ERROR) << "RtpPacketHistory in inconsistent state, resetting.";
- Reset();
- return nullptr;
- }
+ padding_priority_.erase(&packet_history_[packet_index]);
- // Erase the packet from the map, and capture iterator to the next one.
- StoredPacketIterator next_it = packet_history_.erase(packet_it);
-
- if (is_first_packet) {
- // |next_it| now points to the next element, or to the end. If the end,
- // check if we can wrap around.
- if (next_it == packet_history_.end()) {
- next_it = packet_history_.begin();
- }
-
- // Update |start_seq_no| to the new oldest item.
- if (next_it != packet_history_.end()) {
- start_seqno_ = next_it->first;
- } else {
- start_seqno_.reset();
+ if (packet_index == 0) {
+ while (!packet_history_.empty() &&
+ packet_history_.front().packet_ == nullptr) {
+ packet_history_.pop_front();
}
}
return rtp_packet;
}
+int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
+ if (packet_history_.empty()) {
+ return 0;
+ }
+
+ RTC_DCHECK(packet_history_.front().packet_ != nullptr);
+ int first_seq = packet_history_.front().packet_->SequenceNumber();
+ if (first_seq == sequence_number) {
+ return 0;
+ }
+
+ if (IsNewerSequenceNumber(sequence_number, first_seq)) {
+ // New packet is ahead of start of list. Find the delta.
+ int packet_index = sequence_number - first_seq;
+ if (packet_index < 0) {
+ // A wrap-around has occurred, unwrap to get a valid index.
+ packet_index += 1 << 16;
+ }
+ return packet_index;
+ }
+
+ return -1;
+}
+
+RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
+ uint16_t sequence_number) {
+ int index = GetPacketIndex(sequence_number);
+ if (index < 0 || static_cast<size_t>(index) >= packet_history_.size()) {
+ return nullptr;
+ }
+ return &packet_history_[index];
+}
+
RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState(
const RtpPacketHistory::StoredPacket& stored_packet) {
RtpPacketHistory::PacketState state;
diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h
index 4850c75..9253ede 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history.h
+++ b/modules/rtp_rtcp/source/rtp_packet_history.h
@@ -11,6 +11,7 @@
#ifndef MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_
#define MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_
+#include <deque>
#include <map>
#include <memory>
#include <set>
@@ -53,6 +54,8 @@
// Maximum number of packets we ever allow in the history.
static constexpr size_t kMaxCapacity = 9600;
+ // Maximum number of entries in prioritized queue of padding packets.
+ static constexpr size_t kMaxPaddingtHistory = 63;
// Don't remove packets within max(1000ms, 3x RTT).
static constexpr int64_t kMinPacketDurationMs = 1000;
static constexpr int kMinPacketDurationRtt = 3;
@@ -171,8 +174,6 @@
bool operator()(StoredPacket* lhs, StoredPacket* rhs) const;
};
- using StoredPacketIterator = std::map<uint16_t, StoredPacket>::iterator;
-
// Helper method used by GetPacketAndSetSendTime() and GetPacketState() to
// check if packet has too recently been sent.
bool VerifyRtt(const StoredPacket& packet, int64_t now_ms) const
@@ -181,7 +182,11 @@
void CullOldPackets(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Removes the packet from the history, and context/mapping that has been
// stored. Returns the RTP packet instance contained within the StoredPacket.
- std::unique_ptr<RtpPacketToSend> RemovePacket(StoredPacketIterator packet)
+ std::unique_ptr<RtpPacketToSend> RemovePacket(int packet_index)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+ int GetPacketIndex(uint16_t sequence_number) const
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
+ StoredPacket* GetStoredPacket(uint16_t sequence_number)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
static PacketState StoredPacketToPacketState(
const StoredPacket& stored_packet);
@@ -192,8 +197,13 @@
StorageMode mode_ RTC_GUARDED_BY(lock_);
int64_t rtt_ms_ RTC_GUARDED_BY(lock_);
- // Map from rtp sequence numbers to stored packet.
- std::map<uint16_t, StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
+ // Queue of stored packets, ordered by sequence number, with older packets in
+ // the front and new packets being added to the back. Note that there may be
+ // wrap-arounds so the back may have a lower sequence number.
+ // Packets may also be removed out-of-order, in which case there will be
+ // instances of StoredPacket with |packet_| set to nullptr. The first and last
+ // entry in the queue will however always be populated.
+ std::deque<StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
// Total number of packets with inserted.
uint64_t packets_inserted_ RTC_GUARDED_BY(lock_);
@@ -201,10 +211,6 @@
// in GetPayloadPaddingPacket().
PacketPrioritySet padding_priority_ RTC_GUARDED_BY(lock_);
- // The earliest packet in the history. This might not be the lowest sequence
- // number, in case there is a wraparound.
- absl::optional<uint16_t> start_seqno_ RTC_GUARDED_BY(lock_);
-
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtpPacketHistory);
};
} // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
index 0523ed2..242af16 100644
--- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc
@@ -291,6 +291,38 @@
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
+TEST_F(RtpPacketHistoryTest, RemovesLowestPrioPaddingWhenAtMaxCapacity) {
+ // Tests the absolute upper bound on number of packets in the prioritized
+ // set of potential padding packets.
+ const size_t kMaxNumPackets = RtpPacketHistory::kMaxPaddingtHistory;
+ hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets * 2);
+ hist_.SetRtt(1);
+
+ // Add packets until the max is reached, and then yet another one.
+ for (size_t i = 0; i < kMaxNumPackets + 1; ++i) {
+ std::unique_ptr<RtpPacketToSend> packet =
+ CreateRtpPacket(To16u(kStartSeqNum + i));
+ // Don't mark packets as sent, preventing them from being removed.
+ hist_.PutRtpPacket(std::move(packet), fake_clock_.TimeInMilliseconds());
+ }
+
+ // Advance time to allow retransmission/padding.
+ fake_clock_.AdvanceTimeMilliseconds(1);
+
+ // The oldest packet will be least prioritized and has fallen out of the
+ // priority set.
+ for (size_t i = kMaxNumPackets - 1; i > 0; --i) {
+ auto packet = hist_.GetPayloadPaddingPacket();
+ ASSERT_TRUE(packet);
+ EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + i + 1));
+ }
+
+ // Wrap around to newest padding packet again.
+ auto packet = hist_.GetPayloadPaddingPacket();
+ ASSERT_TRUE(packet);
+ EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + kMaxNumPackets));
+}
+
TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) {
const size_t kMaxNumPackets = 10;
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets);