pw_kvs: Fix garbage collection for redundant entries

Fix garbage collection when using redudant entries.
- Improve "find sector to write entry to" and "find sector to GC"
  algorithms so they work better with redundant entries.
- Add support in GC to be aware of partially written keys, where the
  first entry is written to flash, but GC is needed to free space for
  the redundant entries to have space to be written.

Tested with kEntryRedundancy up to 2.

Change-Id: Iff88d970714fa1ece634af926a1838a5f7d5d7e7
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 7f2dc2d..44dd3dc 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -41,12 +41,14 @@
 KeyValueStore::KeyValueStore(FlashPartition* partition,
                              Vector<KeyDescriptor>& key_descriptor_list,
                              Vector<SectorDescriptor>& sector_descriptor_list,
+                             size_t redundancy,
                              span<const EntryFormat> formats,
                              const Options& options)
     : partition_(*partition),
       formats_(formats),
       key_descriptors_(key_descriptor_list),
       sectors_(sector_descriptor_list),
+      redundancy_(redundancy),
       options_(options) {
   Reset();
 }
@@ -313,7 +315,7 @@
     // If the entries have a duplicate transaction ID, add the new (redundant)
     // entry to the existing descriptor.
     if (existing_descriptor->hash() != key_descriptor.hash()) {
-      ERR("Duplicate entry for key %#010" PRIx32 " with transaction ID %" PRIu32
+      ERR("Duplicate entry for key 0x%08" PRIx32 " with transaction ID %" PRIu32
           " has non-matching hash",
           key_descriptor.hash(),
           key_descriptor.transaction_id());
@@ -376,7 +378,7 @@
 
   if (status.ok()) {
     // TODO: figure out logging how to support multiple addresses.
-    DBG("Overwriting entry for key %#010" PRIx32 " in %u sectors including %u",
+    DBG("Overwriting entry for key 0x%08" PRIx32 " in %u sectors including %u",
         key_descriptor->hash(),
         unsigned(key_descriptor->addresses().size()),
         SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -398,7 +400,7 @@
   TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
   // TODO: figure out logging how to support multiple addresses.
-  DBG("Writing tombstone for key %#010" PRIx32 " in %u sectors including %u",
+  DBG("Writing tombstone for key 0x%08" PRIx32 " in %u sectors including %u",
       key_descriptor->hash(),
       unsigned(key_descriptor->addresses().size()),
       SectorIndex(SectorFromAddress(key_descriptor->address())));
@@ -410,6 +412,8 @@
   key_buffer_.fill('\0');
 
   Entry entry;
+  // TODO: add support for using one of the redundant entries if reading the
+  // first copy fails.
   if (Entry::Read(
           kvs_.partition_, descriptor_->address(), kvs_.formats_, &entry)
           .ok()) {
@@ -513,6 +517,8 @@
 
 StatusWithSize KeyValueStore::ValueSize(const KeyDescriptor& descriptor) const {
   Entry entry;
+  // TODO: add support for using one of the redundant entries if reading the
+  // first copy fails.
   TRY_WITH_SIZE(
       Entry::Read(partition_, descriptor.address(), formats_, &entry));
 
@@ -545,6 +551,8 @@
 
   for (auto& descriptor : key_descriptors_) {
     if (descriptor.hash() == hash) {
+      // TODO: add support for using one of the redundant entries if reading the
+      // first copy fails.
       TRY(Entry::ReadKey(
           partition_, descriptor.address(), key.size(), key_buffer.data()));
 
@@ -641,7 +649,7 @@
   //   include garbage collecting one or more sectors if needed.
   // - Write the entry to the sector.
   // - Repeat for redundancy number of total entries.
-  for (size_t i = 0; i < internal::kEntryRedundancy; i++) {
+  for (size_t i = 0; i < redundancy_; i++) {
     SectorDescriptor* sector;
     TRY(GetSectorForWrite(&sector, entry_size, key_descriptor));
 
@@ -674,6 +682,7 @@
   Status result = FindSectorWithSpace(
       sector, entry_size, kAppendEntry, key_descriptor->addresses());
 
+  size_t gc_sector_count = 0;
   bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
 
   // Do garbage collection as needed, so long as policy allows.
@@ -684,7 +693,7 @@
       do_auto_gc = false;
     }
     // Garbage collect and then try again to find the best sector.
-    Status gc_status = GarbageCollectPartial();
+    Status gc_status = GarbageCollectPartial(key_descriptor);
     if (!gc_status.ok()) {
       if (gc_status == Status::NOT_FOUND) {
         // Not enough space, and no reclaimable bytes, this KVS is full!
@@ -695,6 +704,16 @@
 
     result = FindSectorWithSpace(
         sector, entry_size, kAppendEntry, key_descriptor->addresses());
+
+    gc_sector_count++;
+    // Allow total sectors + 2 number of GC cycles so that once reclaimable
+    // bytes in all the sectors have been reclaimed can try and free up space by
+    // moving entries for keys other than the one being worked on in to sectors
+    // that have copies of the key trying to be written.
+    if (gc_sector_count > (partition_.sector_count() + 2)) {
+      ERR("Did more GC sectors than total sectors!!!!");
+      return Status::RESOURCE_EXHAUSTED;
+    }
   }
 
   if (!result.ok()) {
@@ -729,7 +748,7 @@
   return Status::OK;
 }
 
-Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+Status KeyValueStore::RelocateEntry(KeyDescriptor* key_descriptor,
                                     KeyValueStore::Address old_address) {
   Entry entry;
   TRY(Entry::Read(partition_, old_address, formats_, &entry));
@@ -743,7 +762,7 @@
   SectorDescriptor* new_sector;
 
   TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, key_descriptor.addresses()));
+      &new_sector, entry.size(), kGarbageCollect, key_descriptor->addresses()));
   const Address new_address = NextWritableAddress(new_sector);
   TRY(MoveEntry(new_address, entry));
 
@@ -752,9 +771,16 @@
 
   // Entry was written successfully; update the key descriptor and the sector
   // descriptors to reflect the new entry.
-  key_descriptor.UpdateAddress(old_address, new_address);
-  new_sector->AddValidBytes(entry.size());
-  SectorFromAddress(old_address)->RemoveValidBytes(entry.size());
+  TRY(key_descriptor->UpdateAddress(old_address, new_address));
+
+  if ((key_descriptor >= key_descriptors_.begin()) &&
+      (key_descriptor < key_descriptors_.end())) {
+    // If the key_descriptor is in the main key_descriptors_ list, it has been
+    // accounted for in valid bytes, so only do valid byte updates for those
+    // descriptors.
+    new_sector->AddValidBytes(entry.size());
+    SectorFromAddress(old_address)->RemoveValidBytes(entry.size());
+  }
 
   return Status::OK;
 }
@@ -782,9 +808,10 @@
   const span value = span(value_buffer.data(), result.size());
   TRY(entry.VerifyChecksum(key, value));
 
-  DBG("Moving %zu B entry with transaction ID %zu to address %#zx",
+  DBG("Moving %zu B entry with transaction ID %zu to to sector %u address %#zx",
       entry.size(),
       size_t(entry.transaction_id()),
+      SectorIndex(SectorFromAddress(new_address)),
       size_t(new_address));
 
   // Step 2: Write the entry to the new location.
@@ -811,6 +838,15 @@
   return Status::OK;
 }
 
+// Find if search_set contains value.
+// TODO: At some point move this to pw_containers, along with adding tests for
+// it.
+template <typename Container, typename T>
+bool Contains(const Container& search_set, const T& value) {
+  return std::find(std::begin(search_set), std::end(search_set), value) !=
+         std::end(search_set);
+}
+
 // Find either an existing sector with enough space that is not the sector to
 // skip, or an empty sector. Maintains the invariant that there is always at
 // least 1 empty sector except during GC. On GC, skip sectors that have
@@ -823,15 +859,20 @@
   SectorDescriptor* first_empty_sector = nullptr;
   bool at_least_two_empty_sectors = (find_mode == kGarbageCollect);
 
+  // Used for the GC reclaimable bytes check
+  SectorDescriptor* non_empty_least_reclaimable_sector = nullptr;
+  const size_t sector_size_bytes = partition_.sector_size_bytes();
+
   // Build a vector of sectors to avoid.
-  Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
+  Vector<SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
   for (auto& address : addresses_to_skip) {
     sectors_to_skip.push_back(SectorFromAddress(address));
   }
 
-  DBG("Find sector with %zu bytes available, starting with sector %u",
+  DBG("Find sector with %zu bytes available, starting with sector %u, %s",
       size,
-      SectorIndex(last_new_sector_));
+      SectorIndex(last_new_sector_),
+      (find_mode == kAppendEntry) ? "Append" : "GC");
   for (auto& skip_sector : sectors_to_skip) {
     DBG("  Skip sector %u", SectorIndex(skip_sector));
   }
@@ -845,7 +886,7 @@
   // sectors.
   SectorDescriptor* sector = last_new_sector_;
 
-  // Look for a sector to use with enough space. The search uses a 2 priority
+  // Look for a sector to use with enough space. The search uses a 3 priority
   // tier process.
   //
   // Tier 1 is sector that already has valid data. During GC only select a
@@ -855,23 +896,35 @@
   // Tier 2 is find sectors that are empty/erased. While scanning for a partial
   // sector, keep track of the first empty sector and if a second empty sector
   // was seen. If during GC then count the second empty sector as always seen.
+  //
+  // Tier 3 is during garbage collection, find sectors with enough space that
+  // are not empty but have recoverable bytes. Pick the sector with the least
+  // recoverable bytes to minimize the likelyhood of this sector needing to be
+  // garbage collected soon.
   for (size_t j = 0; j < sectors_.size(); j++) {
     sector += 1;
     if (sector == sectors_.end()) {
       sector = sectors_.begin();
     }
 
-    if (std::find(sectors_to_skip.begin(), sectors_to_skip.end(), sector) !=
-        sectors_to_skip.end()) {
+    // Skip sectors in the skip list.
+    if (Contains(sectors_to_skip, sector)) {
       continue;
     }
 
-    const size_t sector_size_bytes = partition_.sector_size_bytes();
-    if (!sector->Empty(sector_size_bytes) && sector->HasSpace(size) &&
-        ((find_mode == kAppendEntry) ||
-         (sector->RecoverableBytes(sector_size_bytes) == 0))) {
-      *found_sector = sector;
-      return Status::OK;
+    if (!sector->Empty(sector_size_bytes) && sector->HasSpace(size)) {
+      if ((find_mode == kAppendEntry) ||
+          (sector->RecoverableBytes(sector_size_bytes) == 0)) {
+        *found_sector = sector;
+        return Status::OK;
+      } else {
+        if ((non_empty_least_reclaimable_sector == nullptr) ||
+            (non_empty_least_reclaimable_sector->RecoverableBytes(
+                 sector_size_bytes) <
+             sector->RecoverableBytes(sector_size_bytes))) {
+          non_empty_least_reclaimable_sector = sector;
+        }
+      }
     }
 
     if (sector->Empty(sector_size_bytes)) {
@@ -883,10 +936,11 @@
     }
   }
 
-  // If the scan for a partial sector does not find a suitable sector, use the
-  // first empty sector that was found. Normally it is required to keep 1 empty
-  // sector after the sector found here, but that rule does not apply during GC.
-  if (at_least_two_empty_sectors) {
+  // Tier 2 check: If the scan for a partial sector does not find a suitable
+  // sector, use the first empty sector that was found. Normally it is required
+  // to keep 1 empty sector after the sector found here, but that rule does not
+  // apply during GC.
+  if (first_empty_sector != nullptr && at_least_two_empty_sectors) {
     DBG("  Found a usable empty sector; returning the first found (%u)",
         SectorIndex(first_empty_sector));
     last_new_sector_ = first_empty_sector;
@@ -894,29 +948,63 @@
     return Status::OK;
   }
 
+  // Tier 3 check: If we got this far, use the sector with least recoverable
+  // bytes
+  if (non_empty_least_reclaimable_sector != nullptr) {
+    *found_sector = non_empty_least_reclaimable_sector;
+    DBG("  Found a usable sector %u, with %zu B recoverable, in GC",
+        SectorIndex(*found_sector),
+        (*found_sector)->RecoverableBytes(sector_size_bytes));
+    return Status::OK;
+  }
+
   // No sector was found.
   DBG("  Unable to find a usable sector");
   *found_sector = nullptr;
   return Status::RESOURCE_EXHAUSTED;
 }
 
-KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect() {
+// TODO: Break up this function in to smaller sub-chunks including create an
+// abstraction for the sector list. Look in to being able to unit test this as
+// its own thing
+KeyValueStore::SectorDescriptor* KeyValueStore::FindSectorToGarbageCollect(
+    span<const Address> addresses_to_avoid) {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   SectorDescriptor* sector_candidate = nullptr;
   size_t candidate_bytes = 0;
 
+  // Build a vector of sectors to avoid.
+  Vector<const SectorDescriptor*, internal::kEntryRedundancy> sectors_to_skip;
+  for (auto& address : addresses_to_avoid) {
+    sectors_to_skip.push_back(SectorFromAddress(address));
+    DBG("    Skip sector %u", SectorIndex(SectorFromAddress(address)));
+  }
+
   // Step 1: Try to find a sectors with stale keys and no valid keys (no
   // relocation needed). If any such sectors are found, use the sector with the
   // most reclaimable bytes.
   for (auto& sector : sectors_) {
     if ((sector.valid_bytes() == 0) &&
-        (sector.RecoverableBytes(sector_size_bytes) > candidate_bytes)) {
+        (sector.RecoverableBytes(sector_size_bytes) > candidate_bytes) &&
+        !Contains(sectors_to_skip, &sector)) {
       sector_candidate = &sector;
       candidate_bytes = sector.RecoverableBytes(sector_size_bytes);
     }
   }
 
-  // Step 2: If step 1 yields no sectors, just find the sector with the most
+  // Step 2a: If step 1 yields no sectors, just find the sector with the most
+  // reclaimable bytes but no addresses to avoid.
+  if (sector_candidate == nullptr) {
+    for (auto& sector : sectors_) {
+      if ((sector.RecoverableBytes(sector_size_bytes) > candidate_bytes) &&
+          !Contains(sectors_to_skip, &sector)) {
+        sector_candidate = &sector;
+        candidate_bytes = sector.RecoverableBytes(sector_size_bytes);
+      }
+    }
+  }
+
+  // Step 2b: If step 1 yields no sectors, just find the sector with the most
   // reclaimable bytes.
   if (sector_candidate == nullptr) {
     for (auto& sector : sectors_) {
@@ -927,6 +1015,21 @@
     }
   }
 
+  // Step 3: If no sectors with reclaimable bytes, select the sector with the
+  // most free bytes. This at least will allow entries of existing keys to get
+  // spread to other sectors, including sectors that already have copies of the
+  // current key being written.
+  if (sector_candidate == nullptr) {
+    for (auto& sector : sectors_) {
+      if ((sector.valid_bytes() > candidate_bytes) &&
+          !Contains(sectors_to_skip, &sector)) {
+        sector_candidate = &sector;
+        candidate_bytes = sector.valid_bytes();
+        DBG("    Doing GC on sector with no reclaimable bytes!");
+      }
+    }
+  }
+
   if (sector_candidate != nullptr) {
     DBG("Found sector %u to Garbage Collect, %zu recoverable bytes",
         SectorIndex(sector_candidate),
@@ -958,32 +1061,65 @@
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectPartial() {
-  DBG("Garbage Collect a single sector");
+Status KeyValueStore::GarbageCollectPartial(KeyDescriptor* key_in_progress) {
+  DBG("Garbage Collect a single sector%s",
+      (key_in_progress == nullptr) ? "" : ", with key in progress");
 
   // Step 1: Find the sector to garbage collect
-  SectorDescriptor* sector_to_gc = FindSectorToGarbageCollect();
+  auto addresses = span<const Address>();
+  if (key_in_progress != nullptr) {
+    DBG("  Use addresses to avoid");
+    addresses = key_in_progress->addresses();
+  }
+  SectorDescriptor* sector_to_gc = FindSectorToGarbageCollect(addresses);
 
   if (sector_to_gc == nullptr) {
     // Nothing to GC.
     return Status::NOT_FOUND;
   }
 
-  TRY(GarbageCollectSector(sector_to_gc));
+  TRY(GarbageCollectSector(sector_to_gc, key_in_progress));
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectSector(SectorDescriptor* sector_to_gc) {
-  // Step 1: Move any valid entries in the GC sector to other sectors
-  if (sector_to_gc->valid_bytes() != 0) {
-    for (auto& descriptor : key_descriptors_) {
-      if (AddressInSector(*sector_to_gc, descriptor.address())) {
-        DBG("  Relocate entry");
-        TRY(RelocateEntry(descriptor, descriptor.address()));
+Status KeyValueStore::RelocateKeyAddressesInSector(
+    internal::SectorDescriptor* sector_to_gc,
+    internal::KeyDescriptor* descriptor) {
+  for (auto address : descriptor->addresses()) {
+    if (AddressInSector(*sector_to_gc, address)) {
+      DBG("  Relocate entry for Key 0x%08zx, address %zu",
+          size_t(descriptor->hash()),
+          size_t(address));
+      TRY(RelocateEntry(descriptor, address));
+    }
+  }
+  return Status::OK;
+};
+
+Status KeyValueStore::GarbageCollectSector(SectorDescriptor* sector_to_gc,
+                                           KeyDescriptor* key_in_progress) {
+  // Pre-step: Check if the key in progress has any addresses in the sector to
+  // GC.
+  bool key_in_progress_in_sector = false;
+  if (key_in_progress != nullptr) {
+    for (Address address : key_in_progress->addresses()) {
+      if (AddressInSector(*sector_to_gc, address)) {
+        key_in_progress_in_sector = true;
+        break;
       }
     }
   }
 
+  // Step 1: Move any valid entries in the GC sector to other sectors
+  if ((sector_to_gc->valid_bytes() != 0 || key_in_progress_in_sector)) {
+    if (key_in_progress != nullptr) {
+      TRY(RelocateKeyAddressesInSector(sector_to_gc, key_in_progress));
+    }
+    for (auto& descriptor : key_descriptors_) {
+      TRY(RelocateKeyAddressesInSector(sector_to_gc, &descriptor));
+    }
+  }
+
   if (sector_to_gc->valid_bytes() != 0) {
     ERR("  Failed to relocate valid entries from sector being garbage "
         "collected, %zu valid bytes remain",