pw_kvs: EntryCache class

- Allocate space for addresses in the derived KeyValueStore class. This
  allows having multiple KeyValueStores with different redundancies,
  without any wasted memory.
- Addresses are moved to a contiguous array instead of a Vector,
  removing the kMaxEntries * uint32_t bytes of Vector overhead.
- Create the EntryCache and EntryMetadata abstractions. These bring
  together KeyDescriptors and their addresses.
- Move functions for finding and updating KeyDescriptors to the
  EntryCache and EntryMetadata classes.
- Start EntryCache unit tests. The tests will be expanded in future CLs.

Change-Id: I726ec198c0c4188086357ac6df944a7670c30000
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d76323e..2ad68fa 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -19,10 +19,7 @@
 #include <cstring>
 #include <type_traits>
 
-#include "pw_kvs/format.h"
-
 #define PW_LOG_USE_ULTRA_SHORT_NAMES 1
-#include "pw_kvs/internal/entry.h"
 #include "pw_kvs_private/macros.h"
 #include "pw_log/log.h"
 
@@ -47,16 +44,18 @@
 }  // namespace
 
 KeyValueStore::KeyValueStore(FlashPartition* partition,
-                             Vector<KeyDescriptor>& key_descriptor_list,
-                             Vector<SectorDescriptor>& sector_descriptor_list,
-                             size_t redundancy,
                              span<const EntryFormat> formats,
-                             const Options& options)
+                             const Options& options,
+                             size_t redundancy,
+                             Vector<SectorDescriptor>& sector_descriptor_list,
+                             const SectorDescriptor** temp_sectors_to_skip,
+                             Vector<KeyDescriptor>& key_descriptor_list,
+                             Address* addresses)
     : partition_(*partition),
       formats_(formats),
-      key_descriptors_(key_descriptor_list),
+      entry_cache_(key_descriptor_list, addresses, redundancy),
       sectors_(sector_descriptor_list),
-      redundancy_(redundancy),
+      temp_sectors_to_skip_(temp_sectors_to_skip),
       options_(options) {
   initialized_ = false;
   last_new_sector_ = nullptr;
@@ -67,7 +66,7 @@
   initialized_ = false;
   last_new_sector_ = nullptr;
   last_transaction_id_ = 0;
-  key_descriptors_.clear();
+  entry_cache_.Reset();
 
   INF("Initializing key value store");
   if (partition_.sector_count() > sectors_.max_size()) {
@@ -172,26 +171,23 @@
   }
 
   DBG("Second pass: Count valid bytes in each sector");
-  const KeyDescriptor* newest_key = nullptr;
+  Address newest_key = 0;
 
   // For every valid key, increment the valid bytes for that sector.
-  for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    for (auto& address : key_descriptor.addresses()) {
+
+  for (const EntryMetadata& metadata : entry_cache_) {
+    for (Address address : metadata.addresses()) {
       Entry entry;
       TRY(Entry::Read(partition_, address, formats_, &entry));
       SectorFromAddress(address)->AddValidBytes(entry.size());
     }
-    if (key_descriptor.IsNewerThan(last_transaction_id_)) {
-      last_transaction_id_ = key_descriptor.transaction_id();
-      newest_key = &key_descriptor;
+    if (metadata.IsNewerThan(last_transaction_id_)) {
+      last_transaction_id_ = metadata.transaction_id();
+      newest_key = metadata.addresses().back();
     }
   }
 
-  if (newest_key == nullptr) {
-    last_new_sector_ = sectors_.begin();
-  } else {
-    last_new_sector_ = SectorFromAddress(newest_key->addresses().back());
-  }
+  last_new_sector_ = SectorFromAddress(newest_key);
 
   if (!empty_sector_found) {
     // TODO: Record/report the error condition and recovery result.
@@ -208,7 +204,7 @@
   INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
       "%zu, logical sector size %zu bytes",
       size(),
-      (key_descriptors_.size() - size()),
+      (entry_cache_.total_entries() - size()),
       sectors_.size(),
       partition_.sector_size_bytes());
 
@@ -261,11 +257,10 @@
   TRY(entry.VerifyChecksumInFlash());
 
   // A valid entry was found, so update the next entry address before doing any
-  // of the checks that happen in AppendNewOrOverwriteStaleExistingDescriptor().
+  // of the checks that happen in AddNewOrUpdateExisting.
   *next_entry_address = entry.next_address();
-  TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
-
-  return Status::OK;
+  return entry_cache_.AddNewOrUpdateExisting(
+      entry.descriptor(key), entry.address(), partition_.sector_size_bytes());
 }
 
 // Scans flash memory within a sector to find a KVS entry magic.
@@ -295,73 +290,15 @@
   return Status::NOT_FOUND;
 }
 
-// TODO: This method is the trigger of the O(valid_entries * all_entries) time
-// complexity for reading. At some cost to memory, this could be optimized by
-// using a hash table instead of scanning, but in practice this should be fine
-// for a small number of keys
-Status KeyValueStore::AppendNewOrOverwriteStaleExistingDescriptor(
-    const KeyDescriptor& key_descriptor) {
-  // With the new key descriptor, either add it to the descriptor table or
-  // overwrite an existing entry with an older version of the key.
-  KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.hash());
-
-  // Write a new entry.
-  if (existing_descriptor == nullptr) {
-    if (key_descriptors_.full()) {
-      return Status::RESOURCE_EXHAUSTED;
-    }
-    key_descriptors_.push_back(key_descriptor);
-  } else if (key_descriptor.IsNewerThan(
-                 existing_descriptor->transaction_id())) {
-    // Existing entry is old; replace the existing entry with the new one.
-    *existing_descriptor = key_descriptor;
-  } else if (existing_descriptor->transaction_id() ==
-             key_descriptor.transaction_id()) {
-    // If the entries have a duplicate transaction ID, add the new (redundant)
-    // entry to the existing descriptor.
-    if (existing_descriptor->hash() != key_descriptor.hash()) {
-      ERR("Duplicate entry for key 0x%08" PRIx32 " with transaction ID %" PRIu32
-          " has non-matching hash",
-          key_descriptor.hash(),
-          key_descriptor.transaction_id());
-      return Status::DATA_LOSS;
-    }
-
-    // Verify that this entry is not in the same sector as an existing copy of
-    // this same key.
-    for (auto address : existing_descriptor->addresses()) {
-      if (SectorFromAddress(address) ==
-          SectorFromAddress(key_descriptor.address())) {
-        DBG("Multiple Redundant entries in same sector %u",
-            SectorIndex(SectorFromAddress(address)));
-        return Status::DATA_LOSS;
-      }
-    }
-    existing_descriptor->addresses().push_back(key_descriptor.address());
-  } else {
-    DBG("Found stale entry when appending; ignoring");
-  }
-  return Status::OK;
-}
-
-KeyValueStore::KeyDescriptor* KeyValueStore::FindDescriptor(uint32_t hash) {
-  for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    if (key_descriptor.hash() == hash) {
-      return &key_descriptor;
-    }
-  }
-  return nullptr;
-}
-
 StatusWithSize KeyValueStore::Get(string_view key,
                                   span<byte> value_buffer,
                                   size_t offset_bytes) const {
   TRY_WITH_SIZE(CheckOperation(key));
 
-  const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return Get(key, *key_descriptor, value_buffer, offset_bytes);
+  return Get(key, metadata, value_buffer, offset_bytes);
 }
 
 Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
@@ -378,17 +315,16 @@
     return Status::INVALID_ARGUMENT;
   }
 
-  KeyDescriptor* key_descriptor;
-  Status status = FindKeyDescriptor(key, &key_descriptor);
+  EntryMetadata metadata;
+  Status status = entry_cache_.Find(partition_, key, &metadata);
 
   if (status.ok()) {
     // TODO: figure out logging how to support multiple addresses.
-    DBG("Overwriting entry for key 0x%08" PRIx32 " in %u sectors including %u",
-        key_descriptor->hash(),
-        unsigned(key_descriptor->addresses().size()),
-        SectorIndex(SectorFromAddress(key_descriptor->address())));
-    return WriteEntryForExistingKey(
-        key_descriptor, KeyDescriptor::kValid, key, value);
+    DBG("Overwriting entry for key 0x%08" PRIx32 " in %zu sectors including %u",
+        metadata.hash(),
+        metadata.addresses().size(),
+        SectorIndex(SectorFromAddress(metadata.first_address())));
+    return WriteEntryForExistingKey(metadata, EntryState::kValid, key, value);
   }
 
   if (status == Status::NOT_FOUND) {
@@ -401,16 +337,15 @@
 Status KeyValueStore::Delete(string_view key) {
   TRY(CheckOperation(key));
 
-  KeyDescriptor* key_descriptor;
-  TRY(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
 
   // TODO: figure out logging how to support multiple addresses.
-  DBG("Writing tombstone for key 0x%08" PRIx32 " in %u sectors including %u",
-      key_descriptor->hash(),
-      unsigned(key_descriptor->addresses().size()),
-      SectorIndex(SectorFromAddress(key_descriptor->address())));
-  return WriteEntryForExistingKey(
-      key_descriptor, KeyDescriptor::kDeleted, key, {});
+  DBG("Writing tombstone for key 0x%08" PRIx32 " in %zu sectors including %u",
+      metadata.hash(),
+      metadata.addresses().size(),
+      SectorIndex(SectorFromAddress(metadata.first_address())));
+  return WriteEntryForExistingKey(metadata, EntryState::kDeleted, key, {});
 }
 
 void KeyValueStore::Item::ReadKey() {
@@ -420,7 +355,7 @@
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   if (Entry::Read(
-          kvs_.partition_, descriptor_->address(), kvs_.formats_, &entry)
+          kvs_.partition_, iterator_->first_address(), kvs_.formats_, &entry)
           .ok()) {
     entry.ReadKey(key_buffer_);
   }
@@ -428,53 +363,39 @@
 
 KeyValueStore::iterator& KeyValueStore::iterator::operator++() {
   // Skip to the next entry that is valid (not deleted).
-  while (++item_.descriptor_ != item_.kvs_.key_descriptors_.end() &&
-         item_.descriptor_->deleted()) {
+  while (++item_.iterator_ != item_.kvs_.entry_cache_.end() &&
+         item_.iterator_->deleted()) {
   }
   return *this;
 }
 
 KeyValueStore::iterator KeyValueStore::begin() const {
-  const KeyDescriptor* descriptor = key_descriptors_.begin();
+  internal::EntryCache::iterator cache_iterator = entry_cache_.begin();
   // Skip over any deleted entries at the start of the descriptor list.
-  while (descriptor != key_descriptors_.end() && descriptor->deleted()) {
-    ++descriptor;
+  while (cache_iterator != entry_cache_.end() && cache_iterator->deleted()) {
+    ++cache_iterator;
   }
-  return iterator(*this, descriptor);
-}
-
-// TODO(hepler): The valid entry count could be tracked in the KVS to avoid the
-// need for this for-loop.
-size_t KeyValueStore::size() const {
-  size_t valid_entries = 0;
-
-  for (const KeyDescriptor& key_descriptor : key_descriptors_) {
-    if (!key_descriptor.deleted()) {
-      valid_entries += 1;
-    }
-  }
-
-  return valid_entries;
+  return iterator(*this, cache_iterator);
 }
 
 StatusWithSize KeyValueStore::ValueSize(string_view key) const {
   TRY_WITH_SIZE(CheckOperation(key));
 
-  const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
+  EntryMetadata metadata;
+  TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return ValueSize(*key_descriptor);
+  return ValueSize(metadata);
 }
 
 StatusWithSize KeyValueStore::Get(string_view key,
-                                  const KeyDescriptor& descriptor,
+                                  const EntryMetadata& metadata,
                                   span<std::byte> value_buffer,
                                   size_t offset_bytes) const {
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   TRY_WITH_SIZE(
-      Entry::Read(partition_, descriptor.address(), formats_, &entry));
+      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
   StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
   if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -495,19 +416,19 @@
                                    size_t size_bytes) const {
   TRY(CheckOperation(key));
 
-  const KeyDescriptor* descriptor;
-  TRY(FindExistingKeyDescriptor(key, &descriptor));
+  EntryMetadata metadata;
+  TRY(entry_cache_.FindExisting(partition_, key, &metadata));
 
-  return FixedSizeGet(key, *descriptor, value, size_bytes);
+  return FixedSizeGet(key, metadata, value, size_bytes);
 }
 
 Status KeyValueStore::FixedSizeGet(std::string_view key,
-                                   const KeyDescriptor& descriptor,
+                                   const EntryMetadata& metadata,
                                    void* value,
                                    size_t size_bytes) const {
   // Ensure that the size of the stored value matches the size of the type.
   // Otherwise, report error. This check avoids potential memory corruption.
-  TRY_ASSIGN(const size_t actual_size, ValueSize(descriptor));
+  TRY_ASSIGN(const size_t actual_size, ValueSize(metadata));
 
   if (actual_size != size_bytes) {
     DBG("Requested %zu B read, but value is %zu B", size_bytes, actual_size);
@@ -515,17 +436,17 @@
   }
 
   StatusWithSize result =
-      Get(key, descriptor, span(static_cast<byte*>(value), size_bytes), 0);
+      Get(key, metadata, span(static_cast<byte*>(value), size_bytes), 0);
 
   return result.status();
 }
 
-StatusWithSize KeyValueStore::ValueSize(const KeyDescriptor& descriptor) const {
+StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
   TRY_WITH_SIZE(
-      Entry::Read(partition_, descriptor.address(), formats_, &entry));
+      Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
   return StatusWithSize(entry.value_size());
 }
@@ -540,152 +461,97 @@
   return Status::OK;
 }
 
-// Searches for a KeyDescriptor that matches this key and sets *result to point
-// to it if one is found.
-//
-//             OK: there is a matching descriptor and *result is set
-//      NOT_FOUND: there is no descriptor that matches this key, but this key
-//                 has a unique hash (and could potentially be added to the KVS)
-// ALREADY_EXISTS: there is no descriptor that matches this key, but the
-//                 key's hash collides with the hash for an existing descriptor
-//
-Status KeyValueStore::FindKeyDescriptor(string_view key,
-                                        const KeyDescriptor** result) const {
-  const uint32_t hash = internal::Hash(key);
-  Entry::KeyBuffer key_buffer;
-
-  for (auto& descriptor : key_descriptors_) {
-    if (descriptor.hash() == hash) {
-      // TODO: add support for using one of the redundant entries if reading the
-      // first copy fails.
-      TRY(Entry::ReadKey(
-          partition_, descriptor.address(), key.size(), key_buffer.data()));
-
-      if (key == string_view(key_buffer.data(), key.size())) {
-        DBG("Found match for key hash 0x%08" PRIx32, hash);
-        *result = &descriptor;
-        return Status::OK;
-      } else {
-        WRN("Found key hash collision for 0x%08" PRIx32, hash);
-        return Status::ALREADY_EXISTS;
-      }
-    }
-  }
-  return Status::NOT_FOUND;
-}
-
-// Searches for a KeyDescriptor that matches this key and sets *result to point
-// to it if one is found.
-//
-//          OK: there is a matching descriptor and *result is set
-//   NOT_FOUND: there is no descriptor that matches this key
-//
-Status KeyValueStore::FindExistingKeyDescriptor(
-    string_view key, const KeyDescriptor** result) const {
-  Status status = FindKeyDescriptor(key, result);
-
-  // If the key's hash collides with an existing key or if the key is deleted,
-  // treat it as if it is not in the KVS.
-  if (status == Status::ALREADY_EXISTS ||
-      (status.ok() && (*result)->deleted())) {
-    return Status::NOT_FOUND;
-  }
-  return status;
-}
-
-Status KeyValueStore::WriteEntryForExistingKey(KeyDescriptor* key_descriptor,
-                                               KeyDescriptor::State new_state,
+Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
+                                               EntryState new_state,
                                                string_view key,
                                                span<const byte> value) {
   // Read the original entry to get the size for sector accounting purposes.
   Entry entry;
   // TODO: add support for using one of the redundant entries if reading the
   // first copy fails.
-  TRY(Entry::Read(partition_, key_descriptor->address(), formats_, &entry));
+  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
 
-  return WriteEntry(key, value, new_state, key_descriptor, entry.size());
+  return WriteEntry(key, value, new_state, &metadata, entry.size());
 }
 
 Status KeyValueStore::WriteEntryForNewKey(string_view key,
                                           span<const byte> value) {
-  if (key_descriptors_.full()) {
+  if (entry_cache_.full()) {
     WRN("KVS full: trying to store a new entry, but can't. Have %zu entries",
-        key_descriptors_.size());
+        entry_cache_.total_entries());
     return Status::RESOURCE_EXHAUSTED;
   }
 
-  return WriteEntry(key, value, KeyDescriptor::kValid);
+  return WriteEntry(key, value, EntryState::kValid);
 }
 
 Status KeyValueStore::WriteEntry(string_view key,
                                  span<const byte> value,
-                                 KeyDescriptor::State new_state,
-                                 KeyDescriptor* prior_descriptor,
+                                 EntryState new_state,
+                                 EntryMetadata* prior_metadata,
                                  size_t prior_size) {
   const size_t entry_size = Entry::size(partition_, key, value);
 
   // List of addresses for sectors with space for this entry.
-  // TODO: The derived class should allocate space for this address list.
-  Vector<Address, internal::kEntryRedundancy> addresses;
+  Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
 
   // Find sectors to write the entry to. This may involve garbage collecting one
   // or more sectors.
-  for (size_t i = 0; i < redundancy_; i++) {
+  for (size_t i = 0; i < redundancy(); i++) {
     SectorDescriptor* sector;
-    TRY(GetSectorForWrite(&sector, entry_size, addresses));
+    TRY(GetSectorForWrite(&sector, entry_size, span(reserved_addresses, i)));
 
     DBG("Found space for entry in sector %u", SectorIndex(sector));
-    addresses.push_back(NextWritableAddress(sector));
+    reserved_addresses[i] = NextWritableAddress(sector);
   }
 
   // Write the entry at the first address that was found.
-  Entry entry = CreateEntry(addresses.front(), key, value, new_state);
+  Entry entry = CreateEntry(reserved_addresses[0], key, value, new_state);
   TRY(AppendEntry(entry, key, value));
 
   // After writing the first entry successfully, update the key descriptors.
   // Once a single new the entry is written, the old entries are invalidated.
-  KeyDescriptor& new_descriptor =
-      UpdateKeyDescriptor(entry, key, prior_descriptor, prior_size);
+  EntryMetadata new_metadata =
+      UpdateKeyDescriptor(entry, key, prior_metadata, prior_size);
 
   // Write the additional copies of the entry, if redundancy is greater than 1.
-  for (size_t i = 1; i < addresses.size(); ++i) {
-    entry.set_address(addresses[i]);
+  for (size_t i = 1; i < redundancy(); ++i) {
+    entry.set_address(reserved_addresses[i]);
     TRY(AppendEntry(entry, key, value));
-    new_descriptor.addresses().push_back(addresses[i]);
+    new_metadata.AddNewAddress(reserved_addresses[i]);
   }
   return Status::OK;
 }
 
-internal::KeyDescriptor& KeyValueStore::UpdateKeyDescriptor(
+KeyValueStore::EntryMetadata KeyValueStore::UpdateKeyDescriptor(
     const Entry& entry,
     string_view key,
-    KeyDescriptor* prior_descriptor,
+    EntryMetadata* prior_metadata,
     size_t prior_size) {
   // If there is no prior descriptor, create a new one.
-  if (prior_descriptor == nullptr) {
-    key_descriptors_.push_back(entry.descriptor(key));
-    return key_descriptors_.back();
+  if (prior_metadata == nullptr) {
+    return entry_cache_.AddNew(entry.descriptor(key), entry.address());
   }
 
   // Remove valid bytes for the old entry and its copies, which are now stale.
-  for (Address address : prior_descriptor->addresses()) {
+  for (Address address : prior_metadata->addresses()) {
     SectorFromAddress(address)->RemoveValidBytes(prior_size);
   }
 
-  *prior_descriptor = entry.descriptor(prior_descriptor->hash());
-  return *prior_descriptor;
+  prior_metadata->Reset(entry.descriptor(key), entry.address());
+  return *prior_metadata;
 }
 
-// Find a sector to use for writing a new entry to. Do automatic garbage
+// Finds a sector to use for writing a new entry to. Does automatic garbage
 // collection if needed and allowed.
 //
 //                 OK: Sector found with needed space.
 // RESOURCE_EXHAUSTED: No sector available with the needed space.
 Status KeyValueStore::GetSectorForWrite(SectorDescriptor** sector,
                                         size_t entry_size,
-                                        span<const Address> addresses_to_skip) {
+                                        span<const Address> reserved) {
   Status result =
-      FindSectorWithSpace(sector, entry_size, kAppendEntry, addresses_to_skip);
+      FindSectorWithSpace(sector, entry_size, kAppendEntry, {}, reserved);
 
   size_t gc_sector_count = 0;
   bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
@@ -698,7 +564,7 @@
       do_auto_gc = false;
     }
     // Garbage collect and then try again to find the best sector.
-    Status gc_status = GarbageCollectPartial(addresses_to_skip);
+    Status gc_status = GarbageCollectPartial(reserved);
     if (!gc_status.ok()) {
       if (gc_status == Status::NOT_FOUND) {
         // Not enough space, and no reclaimable bytes, this KVS is full!
@@ -707,8 +573,8 @@
       return gc_status;
     }
 
-    result = FindSectorWithSpace(
-        sector, entry_size, kAppendEntry, addresses_to_skip);
+    result =
+        FindSectorWithSpace(sector, entry_size, kAppendEntry, {}, reserved);
 
     gc_sector_count++;
     // Allow total sectors + 2 number of GC cycles so that once reclaimable
@@ -753,9 +619,9 @@
   return Status::OK;
 }
 
-Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
                                     KeyValueStore::Address& address,
-                                    span<const Address> addresses_to_skip) {
+                                    span<const Address> reserved_addresses) {
   Entry entry;
   TRY(Entry::Read(partition_, address, formats_, &entry));
 
@@ -767,34 +633,18 @@
   // an immediate extra relocation).
   SectorDescriptor* new_sector;
 
-  // Avoid both this entry's sectors and sectors in addresses_to_skip.
-  //
-  // This is overly strict. addresses_to_skip is populated when there are
-  // sectors reserved for a new entry. It is safe to garbage collect into these
-  // sectors, as long as there remains room for the pending entry. These
-  // reserved sectors could also be garbage collected if they have recoverable
-  // space.
-  // TODO(hepler): Look into improving garbage collection.
-  //
-  // TODO: The derived class should allocate space for this address list.
-  Vector<Address, internal::kEntryRedundancy* 2> all_addresses_to_skip =
-      key_descriptor.addresses();
-  for (Address address : addresses_to_skip) {
-    if (!Contains(all_addresses_to_skip, address)) {
-      // TODO: DCHECK(!all_addresses_to_skip.full)
-      all_addresses_to_skip.push_back(address);
-    }
-  }
-
-  TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, all_addresses_to_skip));
+  TRY(FindSectorWithSpace(&new_sector,
+                          entry.size(),
+                          kGarbageCollect,
+                          metadata.addresses(),
+                          reserved_addresses));
 
   const Address new_address = NextWritableAddress(new_sector);
   const StatusWithSize result = entry.Copy(new_address);
   new_sector->RemoveWritableBytes(result.size());
   TRY(result);
 
-  // Entry was written successfully; update the key descriptor and the sector
+  // Entry was written successfully; update descriptor's address and the sector
   // descriptors to reflect the new entry.
   SectorFromAddress(address)->RemoveValidBytes(result.size());
   new_sector->AddValidBytes(result.size());
@@ -811,7 +661,8 @@
     SectorDescriptor** found_sector,
     size_t size,
     FindSectorMode find_mode,
-    span<const Address> addresses_to_skip) {
+    span<const Address> addresses_to_skip,
+    span<const Address> reserved_addresses) {
   SectorDescriptor* first_empty_sector = nullptr;
   bool at_least_two_empty_sectors = (find_mode == kGarbageCollect);
 
@@ -819,19 +670,30 @@
   SectorDescriptor* non_empty_least_reclaimable_sector = nullptr;
   const size_t sector_size_bytes = partition_.sector_size_bytes();
 
-  // Build a vector of sectors to avoid.
-  // TODO(hepler): Consolidate the different address / sector lists.
-  Vector<SectorDescriptor*, internal::kEntryRedundancy * 2> sectors_to_skip;
+  // Build a list of sectors to avoid.
+  //
+  // This is overly strict. reserved_addresses is populated when there are
+  // sectors reserved for a new entry. It is safe to garbage collect into
+  // these sectors, as long as there remains room for the pending entry. These
+  // reserved sectors could also be garbage collected if they have recoverable
+  // space. For simplicitly, avoid both the relocating key's redundant entries
+  // (addresses_to_skip) and the sectors reserved for pending writes
+  // (reserved_addresses).
+  // TODO(hepler): Look into improving garbage collection.
+  size_t sectors_to_skip = 0;
   for (Address address : addresses_to_skip) {
-    sectors_to_skip.push_back(SectorFromAddress(address));
+    temp_sectors_to_skip_[sectors_to_skip++] = SectorFromAddress(address);
+  }
+  for (Address address : reserved_addresses) {
+    temp_sectors_to_skip_[sectors_to_skip++] = SectorFromAddress(address);
   }
 
   DBG("Find sector with %zu bytes available, starting with sector %u, %s",
       size,
       SectorIndex(last_new_sector_),
       (find_mode == kAppendEntry) ? "Append" : "GC");
-  for (auto& skip_sector : sectors_to_skip) {
-    DBG("  Skip sector %u", SectorIndex(skip_sector));
+  for (size_t i = 0; i < sectors_to_skip; ++i) {
+    DBG("  Skip sector %u", SectorIndex(temp_sectors_to_skip_[i]));
   }
 
   // The last_new_sector_ is the sector that was last selected as the "new empty
@@ -865,7 +727,7 @@
     }
 
     // Skip sectors in the skip list.
-    if (Contains(sectors_to_skip, sector)) {
+    if (Contains(span(temp_sectors_to_skip_, sectors_to_skip), sector)) {
       continue;
     }
 
@@ -925,18 +787,18 @@
 // abstraction for the sector list. Look in to being able to unit test this as
 // its own thing
 KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect(
-    span<const Address> addresses_to_avoid) {
+    span<const Address> reserved_addresses) {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   SectorDescriptor* sector_candidate = nullptr;
   size_t candidate_bytes = 0;
 
   // Build a vector of sectors to avoid.
-  // TODO(hepler): Consolidate the three address / sector lists.
-  Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
-  for (auto& address : addresses_to_avoid) {
-    sectors_to_skip.push_back(SectorFromAddress(address));
-    DBG("    Skip sector %u", SectorIndex(SectorFromAddress(address)));
+  for (size_t i = 0; i < reserved_addresses.size(); ++i) {
+    temp_sectors_to_skip_[i] = SectorFromAddress(reserved_addresses[i]);
+    DBG("    Skip sector %u",
+        SectorIndex(SectorFromAddress(reserved_addresses[i])));
   }
+  const span sectors_to_skip(temp_sectors_to_skip_, reserved_addresses.size());
 
   // Step 1: Try to find a sectors with stale keys and no valid keys (no
   // relocation needed). If any such sectors are found, use the sector with the
@@ -1009,15 +871,15 @@
 }
 
 Status KeyValueStore::GarbageCollectPartial(
-    span<const Address> addresses_to_skip) {
+    span<const Address> reserved_addresses) {
   DBG("Garbage Collect a single sector");
-  for (auto address : addresses_to_skip) {
+  for (Address address : reserved_addresses) {
     DBG("   Avoid address %u", unsigned(address));
   }
 
   // Step 1: Find the sector to garbage collect
   SectorDescriptor* sector_to_gc =
-      FindSectorToGarbageCollect(addresses_to_skip);
+      FindSectorToGarbageCollect(reserved_addresses);
 
   if (sector_to_gc == nullptr) {
     // Nothing to GC.
@@ -1025,19 +887,19 @@
   }
 
   // Step 2: Garbage collect the selected sector.
-  return GarbageCollectSector(sector_to_gc, addresses_to_skip);
+  return GarbageCollectSector(sector_to_gc, reserved_addresses);
 }
 
 Status KeyValueStore::RelocateKeyAddressesInSector(
-    internal::SectorDescriptor& sector_to_gc,
-    internal::KeyDescriptor& descriptor,
-    span<const Address> addresses_to_skip) {
-  for (FlashPartition::Address& address : descriptor.addresses()) {
+    SectorDescriptor& sector_to_gc,
+    const EntryMetadata& metadata,
+    span<const Address> reserved_addresses) {
+  for (FlashPartition::Address& address : metadata.addresses()) {
     if (AddressInSector(sector_to_gc, address)) {
-      DBG("  Relocate entry for Key 0x%08zx, sector %u",
-          size_t(descriptor.hash()),
+      DBG("  Relocate entry for Key 0x%08" PRIx32 ", sector %u",
+          metadata.hash(),
           SectorIndex(SectorFromAddress(address)));
-      TRY(RelocateEntry(descriptor, address, addresses_to_skip));
+      TRY(RelocateEntry(metadata, address, reserved_addresses));
     }
   }
 
@@ -1045,12 +907,12 @@
 };
 
 Status KeyValueStore::GarbageCollectSector(
-    SectorDescriptor* sector_to_gc, span<const Address> addresses_to_skip) {
+    SectorDescriptor* sector_to_gc, span<const Address> reserved_addresses) {
   // Step 1: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc->valid_bytes() != 0) {
-    for (KeyDescriptor& descriptor : key_descriptors_) {
+    for (const EntryMetadata& metadata : entry_cache_) {
       TRY(RelocateKeyAddressesInSector(
-          *sector_to_gc, descriptor, addresses_to_skip));
+          *sector_to_gc, metadata, reserved_addresses));
     }
   }
 
@@ -1073,7 +935,7 @@
 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
                                                 string_view key,
                                                 span<const byte> value,
-                                                KeyDescriptor::State state) {
+                                                EntryState state) {
   // Always bump the transaction ID when creating a new entry.
   //
   // Burning transaction IDs prevents inconsistencies between flash and memory
@@ -1088,7 +950,7 @@
   // By always burning transaction IDs, the above problem can't happen.
   last_transaction_id_ += 1;
 
-  if (state == KeyDescriptor::kDeleted) {
+  if (state == EntryState::kDeleted) {
     return Entry::Tombstone(
         partition_, address, formats_.primary(), key, last_transaction_id_);
   }
@@ -1100,7 +962,7 @@
                       last_transaction_id_);
 }
 
-void KeyValueStore::LogDebugInfo() {
+void KeyValueStore::LogDebugInfo() const {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   DBG("====================== KEY VALUE STORE DUMP =========================");
   DBG(" ");
@@ -1113,18 +975,18 @@
   DBG("  Alignment        = %zu", partition_.alignment_bytes());
   DBG(" ");
   DBG("Key descriptors:");
-  DBG("  Entry count     = %zu", key_descriptors_.size());
-  DBG("  Max entry count = %zu", key_descriptors_.max_size());
+  DBG("  Entry count     = %zu", entry_cache_.total_entries());
+  DBG("  Max entry count = %zu", entry_cache_.max_entries());
   DBG(" ");
   DBG("      #     hash        version    address   address (hex)");
-  for (size_t i = 0; i < key_descriptors_.size(); ++i) {
-    const KeyDescriptor& kd = key_descriptors_[i];
+  size_t i = 0;
+  for (const EntryMetadata& metadata : entry_cache_) {
     DBG("   |%3zu: | %8zx  |%8zu  | %8zu | %8zx",
-        i,
-        size_t(kd.hash()),
-        size_t(kd.transaction_id()),
-        size_t(kd.address()),
-        size_t(kd.address()));
+        i++,
+        size_t(metadata.hash()),
+        size_t(metadata.transaction_id()),
+        size_t(metadata.first_address()),
+        size_t(metadata.first_address()));
   }
   DBG(" ");
 
@@ -1188,13 +1050,13 @@
 }
 
 void KeyValueStore::LogKeyDescriptor() const {
-  DBG("Key descriptors: count %zu", key_descriptors_.size());
-  for (auto& key : key_descriptors_) {
-    DBG("  - Key: %s, hash %#zx, transaction ID %zu, address %#zx",
-        key.deleted() ? "Deleted" : "Valid",
-        static_cast<size_t>(key.hash()),
-        static_cast<size_t>(key.transaction_id()),
-        static_cast<size_t>(key.address()));
+  DBG("Key descriptors: count %zu", entry_cache_.total_entries());
+  for (auto& metadata : entry_cache_) {
+    DBG("  - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
+        metadata.deleted() ? "Deleted" : "Valid",
+        static_cast<size_t>(metadata.hash()),
+        static_cast<size_t>(metadata.transaction_id()),
+        static_cast<size_t>(metadata.first_address()));
   }
 }