pw_kvs: Add support for tracking redundant entries

Add support in key descriptor to track multiple redundant copies of an
entry.
Add support in Init() to find redundant entries of the same entry.

Change-Id: I19d1e17b6635f4348b69dc37b990daf15c40a813
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index df125d0..8e859f2 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -167,10 +167,11 @@
 
   // For every valid key, increment the valid bytes for that sector.
   for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    Entry entry;
-    TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
-    SectorFromKey(key_descriptor)->AddValidBytes(entry.size());
-
+    for (auto& address : key_descriptor.addresses()) {
+      Entry entry;
+      TRY(Entry::Read(partition_, address, &entry));
+      SectorFromAddress(address)->AddValidBytes(entry.size());
+    }
     if (key_descriptor.IsNewerThan(last_transaction_id_)) {
       last_transaction_id_ = key_descriptor.transaction_id();
       newest_key = &key_descriptor;
@@ -180,7 +181,7 @@
   if (newest_key == nullptr) {
     last_new_sector_ = sectors_.begin();
   } else {
-    last_new_sector_ = SectorFromKey(newest_key);
+    last_new_sector_ = SectorFromAddress(newest_key->addresses().back());
   }
 
   if (!empty_sector_found) {
@@ -259,9 +260,12 @@
   const string_view key(key_buffer.data(), key_length);
 
   TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
+
+  // A valid entry was found, so update the next entry address before doing any
+  // of the checks that happen in AppendNewOrOverwriteStaleExistingDescriptor().
+  *next_entry_address = entry.next_address();
   TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
 
-  *next_entry_address = entry.next_address();
   return Status::OK;
 }
 
@@ -313,16 +317,30 @@
                  existing_descriptor->transaction_id())) {
     // Existing entry is old; replace the existing entry with the new one.
     *existing_descriptor = key_descriptor;
-  } else {
-    // Otherwise, check if the entries have a duplicate transaction ID, which is
-    // not valid.
-    if (existing_descriptor->transaction_id() ==
-        key_descriptor.transaction_id()) {
-      ERR("Data loss: Duplicated old(=%zu) and new(=%zu) transaction ID",
-          size_t(existing_descriptor->transaction_id()),
-          size_t(key_descriptor.transaction_id()));
+  } else if (existing_descriptor->transaction_id() ==
+             key_descriptor.transaction_id()) {
+    // If the entries have a duplicate transaction ID, add the new (redundant)
+    // entry to the existing descriptor.
+    if (existing_descriptor->hash() != key_descriptor.hash()) {
+      ERR("Duplicate entry for key %#010" PRIx32 " with transaction ID %" PRIu32
+          " has non-matching hash",
+          key_descriptor.hash(),
+          key_descriptor.transaction_id());
       return Status::DATA_LOSS;
     }
+
+    // Verify that this entry is not in the same sector as an existing copy of
+    // this same key.
+    for (auto address : existing_descriptor->addresses()) {
+      if (SectorFromAddress(address) ==
+          SectorFromAddress(key_descriptor.address())) {
+        DBG("Multiple Redundant entries in same sector %u",
+            SectorIndex(SectorFromAddress(address)));
+        return Status::DATA_LOSS;
+      }
+    }
+    existing_descriptor->addresses().push_back(key_descriptor.address());
+  } else {
     DBG("Found stale entry when appending; ignoring");
   }
   return Status::OK;
@@ -366,9 +384,11 @@
   Status status = FindKeyDescriptor(key, &key_descriptor);
 
   if (status.ok()) {
-    DBG("Overwriting entry for key %#08" PRIx32 " in sector %u",
+    // TODO: figure out logging how to support multiple addresses.
+    DBG("Overwriting entry for key %#08" PRIx32 " in %u sectors including %u",
         key_descriptor->hash(),
-        SectorIndex(SectorFromKey(key_descriptor)));
+        unsigned(key_descriptor->addresses().size()),
+        SectorIndex(SectorFromAddress(key_descriptor->address())));
     return WriteEntryForExistingKey(
         key_descriptor, KeyDescriptor::kValid, key, value);
   }
@@ -386,9 +406,11 @@
   KeyDescriptor* key_descriptor;
   TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
-  DBG("Writing tombstone for key %#08" PRIx32 " in sector %u",
+  // TODO: figure out logging how to support multiple addresses.
+  DBG("Writing tombstone for key %#08" PRIx32 " in %u sectors including %u",
       key_descriptor->hash(),
-      SectorIndex(SectorFromKey(key_descriptor)));
+      unsigned(key_descriptor->addresses().size()),
+      SectorIndex(SectorFromAddress(key_descriptor->address())));
   return WriteEntryForExistingKey(
       key_descriptor, KeyDescriptor::kDeleted, key, {});
 }
@@ -568,7 +590,6 @@
   // Find the original entry and sector to update the sector's valid_bytes.
   Entry original_entry;
   TRY(Entry::Read(partition_, key_descriptor->address(), &original_entry));
-  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
 
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector,
@@ -577,18 +598,15 @@
       SectorIndex(sector),
       SectorBaseAddress(sector));
 
-  if (old_sector != SectorFromKey(key_descriptor)) {
-    DBG("Sector for old entry (size %zu) was garbage collected. Old entry "
-        "relocated to sector %u",
-        original_entry.size(),
-        SectorIndex(SectorFromKey(key_descriptor)));
-
-    old_sector = SectorFromKey(key_descriptor);
-  }
+  // TODO: Verify the copy does a full copy including the address vector.
+  KeyDescriptor old_key_descriptor = *key_descriptor;
 
   TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
 
-  old_sector->RemoveValidBytes(original_entry.size());
+  for (auto& address : old_key_descriptor.addresses()) {
+    SectorFromAddress(address)->RemoveValidBytes(original_entry.size());
+  }
+
   return Status::OK;
 }
 
@@ -615,7 +633,8 @@
   return Status::OK;
 }
 
-Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor) {
+Status KeyValueStore::RelocateEntry(KeyDescriptor& key_descriptor,
+                                    KeyValueStore::Address address) {
   struct TempEntry {
     Entry::KeyBuffer key;
     std::array<byte, sizeof(working_buffer_) - sizeof(key)> value;
@@ -623,8 +642,8 @@
   auto [key_buffer, value_buffer] =
       *std::launder(reinterpret_cast<TempEntry*>(working_buffer_.data()));
 
-  DBG("Relocating entry at %zx for key %" PRIx32,
-      size_t(key_descriptor.address()),
+  DBG("Relocating entry at %zx for key %#010" PRIx32,
+      size_t(address),
       key_descriptor.hash());
 
   // Read the entry to be relocated. Store the entry in a local variable and
@@ -644,8 +663,6 @@
   const span value = span(value_buffer.data(), result.size());
   TRY(entry.VerifyChecksum(entry_header_format_.checksum, key, value));
 
-  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
-
   // Find a new sector for the entry and write it to the new location. For
   // relocation the find should not not be a sector already containing the key
   // but can be the always empty sector, since this is part of the GC process
@@ -654,16 +671,36 @@
   // an immediate extra relocation).
   SectorDescriptor* new_sector;
 
-  // TODO: For redundancy work, replace old_sector_const with a vector of
-  // sectors to avoid.
-  const SectorDescriptor* old_sector_const = old_sector;
+  // Build a vector of sectors to avoid.
+  Vector<SectorDescriptor*, internal::kEntryRedundancy> old_sectors;
+  for (auto& address : key_descriptor.addresses()) {
+    old_sectors.push_back(SectorFromAddress(address));
+  }
+
+  // TODO: Remove this once const span can take a non-const span.
+  auto old_sectors_const =
+      span(const_cast<const SectorDescriptor**>(old_sectors.data()),
+           old_sectors.size());
+
   TRY(FindSectorWithSpace(
-      &new_sector, entry.size(), kGarbageCollect, span(&old_sector_const, 1)));
+      &new_sector, entry.size(), kGarbageCollect, old_sectors_const));
+
+  // TODO: This does an entry with new transaction ID. This needs to get changed
+  // to be a copy of this entry with the same transaction ID.
   TRY(AppendEntry(
       new_sector, &key_descriptor, key, value, key_descriptor.state()));
 
   // Do the valid bytes accounting for the sector the entry was relocated from.
-  old_sector->RemoveValidBytes(entry.size());
+  // TODO: AppendEntry() creates an entry with new transaction ID. While that is
+  // used all the old sectors need the valid bytes to be removed. Once it is
+  // switched over to do a copy of the current entry with the same transaction
+  // ID, then the valid bytes need to be removed from only the one sector being
+  // relocated out of.
+  //  SectorFromAddress(address)->RemoveValidBytes(entry.size());
+  (void)address;
+  for (auto& old_sector : old_sectors) {
+    old_sector->RemoveValidBytes(entry.size());
+  }
 
   return Status::OK;
 }
@@ -842,7 +879,7 @@
     for (auto& descriptor : key_descriptors_) {
       if (AddressInSector(*sector_to_gc, descriptor.address())) {
         DBG("  Relocate entry");
-        TRY(RelocateEntry(descriptor));
+        TRY(RelocateEntry(descriptor, descriptor.address()));
       }
     }
   }