pw_kvs: Add KVS error repair/recovery

- Add repair of detected errors in the KVS.
- Add additional error detection in KVS init
- KVS initialization has a new "needs maintenance" state. This is a
  read-only mode because not all of the required KVS system invariants
  have been met.

Change-Id: If13094855d8da03d59011891c8ed5af2ad38ad2d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d0ab512..9fa7199 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -48,13 +48,15 @@
       sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
       entry_cache_(key_descriptor_list, addresses, redundancy),
       options_(options),
-      initialized_(false),
+      initialized_(InitializationState::kNotInitialized),
       error_detected_(false),
+      error_stats_({}),
       last_transaction_id_(0) {}
 
 Status KeyValueStore::Init() {
-  initialized_ = false;
+  initialized_ = InitializationState::kNotInitialized;
   error_detected_ = false;
+  error_stats_ = {};
   last_transaction_id_ = 0;
   sectors_.Reset();
   entry_cache_.Reset();
@@ -191,26 +193,34 @@
 
   sectors_.set_last_new_sector(newest_key);
 
-  if (error_detected_) {
-    Status recovery_status = Repair();
-    if (recovery_status.ok()) {
-      INF("KVS init: Corruption detected and fully repaired");
-    } else {
-      ERR("KVS init: Corruption detected and unable repair");
-    }
-  }
-
   if (!empty_sector_found) {
-    // TODO: Record/report the error condition and recovery result.
-    Status gc_result = GarbageCollectPartial();
-
-    if (!gc_result.ok()) {
-      ERR("KVS init failed: Unable to maintain required free sector");
-      return Status::INTERNAL;
-    }
+    error_detected_ = true;
   }
 
-  initialized_ = true;
+  if (!error_detected_) {
+    initialized_ = InitializationState::kReady;
+  } else {
+    if (options_.recovery != ErrorRecovery::kManual) {
+      WRN("KVS init: Corruption detected, beginning repair");
+      Status recovery_status = Repair();
+
+      if (recovery_status.ok()) {
+        WRN("KVS init: Corruption detected and fully repaired");
+        initialized_ = InitializationState::kReady;
+        total_corrupt_bytes = 0;
+        corrupt_entries = 0;
+      } else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
+        WRN("KVS init: Unable to maintain required free sector");
+        initialized_ = InitializationState::kNeedsMaintenance;
+      } else {
+        WRN("KVS init: Corruption detected and unable repair");
+        initialized_ = InitializationState::kNeedsMaintenance;
+      }
+    } else {
+      WRN("KVS init: Corruption detected, no repair attempted due to options");
+      initialized_ = InitializationState::kNeedsMaintenance;
+    }
+  }
 
   INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
       "%zu, logical sector size %zu bytes",
@@ -219,6 +229,7 @@
       sectors_.size(),
       partition_.sector_size_bytes());
 
+  // Report any corruption was not repaired.
   if (total_corrupt_bytes > 0) {
     WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
         "some keys may be missing",
@@ -231,9 +242,12 @@
 }
 
 KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
-  StorageStats stats{0, 0, 0};
+  StorageStats stats{};
   const size_t sector_size = partition_.sector_size_bytes();
   bool found_empty_sector = false;
+  stats.corrupt_sectors_recovered = error_stats_.corrupt_sectors_recovered;
+  stats.missing_redundant_entries_recovered =
+      error_stats_.missing_redundant_entries_recovered;
 
   for (const SectorDescriptor& sector : sectors_) {
     stats.in_use_bytes += sector.valid_bytes();
@@ -255,6 +269,28 @@
   return stats;
 }
 
+bool KeyValueStore::CheckForErrors() {
+  // Check for corrupted sectors
+  for (SectorDescriptor& sector : sectors_) {
+    if (sector.corrupt()) {
+      error_detected_ = true;
+      break;
+    }
+  }
+
+  // Check for missing redundancy.
+  if (redundancy() > 1) {
+    for (const EntryMetadata& metadata : entry_cache_) {
+      if (metadata.addresses().size() < redundancy()) {
+        error_detected_ = true;
+        break;
+      }
+    }
+  }
+
+  return error_detected();
+}
+
 Status KeyValueStore::LoadEntry(Address entry_address,
                                 Address* next_entry_address) {
   Entry entry;
@@ -304,7 +340,7 @@
 StatusWithSize KeyValueStore::Get(string_view key,
                                   span<byte> value_buffer,
                                   size_t offset_bytes) const {
-  TRY_WITH_SIZE(CheckOperation(key));
+  TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
   TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -313,12 +349,11 @@
 }
 
 Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
+  TRY(CheckWriteOperation(key));
   DBG("Writing key/value; key length=%zu, value length=%zu",
       key.size(),
       value.size());
 
-  TRY(CheckOperation(key));
-
   if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
     DBG("%zu B value with %zu B key cannot fit in one sector",
         value.size(),
@@ -346,7 +381,7 @@
 }
 
 Status KeyValueStore::Delete(string_view key) {
-  TRY(CheckOperation(key));
+  TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
   TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -391,7 +426,7 @@
 }
 
 StatusWithSize KeyValueStore::ValueSize(string_view key) const {
-  TRY_WITH_SIZE(CheckOperation(key));
+  TRY_WITH_SIZE(CheckReadOperation(key));
 
   EntryMetadata metadata;
   TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -426,7 +461,7 @@
 Status KeyValueStore::FixedSizeGet(std::string_view key,
                                    void* value,
                                    size_t size_bytes) const {
-  TRY(CheckOperation(key));
+  TRY(CheckWriteOperation(key));
 
   EntryMetadata metadata;
   TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -463,16 +498,31 @@
   return StatusWithSize(entry.value_size());
 }
 
-Status KeyValueStore::CheckOperation(string_view key) const {
+Status KeyValueStore::CheckWriteOperation(string_view key) const {
   if (InvalidKey(key)) {
     return Status::INVALID_ARGUMENT;
   }
+
+  // For normal write operation the KVS must be fully ready.
   if (!initialized()) {
     return Status::FAILED_PRECONDITION;
   }
   return Status::OK;
 }
 
+Status KeyValueStore::CheckReadOperation(string_view key) const {
+  if (InvalidKey(key)) {
+    return Status::INVALID_ARGUMENT;
+  }
+
+  // Operations that are explicitly read-only can be done after init() has been
+  // called but not fully ready (when needing maintenance).
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+  return Status::OK;
+}
+
 Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
                                                EntryState new_state,
                                                string_view key,
@@ -575,7 +625,7 @@
       do_auto_gc = false;
     }
     // Garbage collect and then try again to find the best sector.
-    Status gc_status = GarbageCollectPartial(reserved);
+    Status gc_status = GarbageCollect(reserved);
     if (!gc_status.ok()) {
       if (gc_status == Status::NOT_FOUND) {
         // Not enough space, and no reclaimable bytes, this KVS is full!
@@ -603,6 +653,16 @@
   return result;
 }
 
+Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
+                                               SectorDescriptor* sector) {
+  if (!status.ok()) {
+    DBG("  Sector %u corrupt", sectors_.Index(sector));
+    sector->mark_corrupt();
+    error_detected_ = true;
+  }
+  return status;
+}
+
 Status KeyValueStore::AppendEntry(const Entry& entry,
                                   string_view key,
                                   span<const byte> value) {
@@ -618,11 +678,11 @@
         entry.size(),
         size_t(entry.address()),
         result.size());
-    return result.status();
+    TRY(MarkSectorCorruptIfNotOk(result.status(), &sector));
   }
 
   if (options_.verify_on_write) {
-    TRY(entry.VerifyChecksumInFlash());
+    TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), &sector));
   }
 
   sector.AddValidBytes(result.size());
@@ -648,20 +708,32 @@
 
   const Address new_address = sectors_.NextWritableAddress(*new_sector);
   const StatusWithSize result = entry.Copy(new_address);
-  new_sector->RemoveWritableBytes(result.size());
-  TRY(result);
 
+  TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+  if (options_.verify_on_write) {
+    TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+  }
   // Entry was written successfully; update descriptor's address and the sector
   // descriptors to reflect the new entry.
-  sectors_.FromAddress(address).RemoveValidBytes(result.size());
+  new_sector->RemoveWritableBytes(result.size());
   new_sector->AddValidBytes(result.size());
+  sectors_.FromAddress(address).RemoveValidBytes(result.size());
   address = new_address;
 
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectFull() {
-  DBG("Garbage Collect all sectors");
+Status KeyValueStore::FullMaintenance() {
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+
+  DBG("Do full maintenance");
+
+  if (error_detected_) {
+    TRY(Repair());
+  }
 
   SectorDescriptor* sector = sectors_.last_new();
 
@@ -678,12 +750,20 @@
     }
   }
 
-  DBG("Garbage Collect all complete");
+  DBG("Full maintenance complete");
   return Status::OK;
 }
 
-Status KeyValueStore::GarbageCollectPartial(
-    span<const Address> reserved_addresses) {
+Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
+  if (initialized_ == InitializationState::kNotInitialized) {
+    return Status::FAILED_PRECONDITION;
+  }
+
+  // Do automatic repair, if KVS options allow for it.
+  if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
+    TRY(Repair());
+  }
+
   DBG("Garbage Collect a single sector");
   for (Address address : reserved_addresses) {
     DBG("   Avoid address %u", unsigned(address));
@@ -720,6 +800,7 @@
 
 Status KeyValueStore::GarbageCollectSector(
     SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
+  DBG("  Garbage Collect sector %u", sectors_.Index(sector_to_gc));
   // Step 1: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc.valid_bytes() != 0) {
     for (const EntryMetadata& metadata : entry_cache_) {
@@ -736,7 +817,7 @@
   }
 
   // Step 2: Reinitialize the sector
-  sector_to_gc.set_writable_bytes(0);
+  sector_to_gc.mark_corrupt();
   TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
   sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
 
@@ -744,6 +825,171 @@
   return Status::OK;
 }
 
+// Add any missing redundant entries/copies for a key.
+Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
+  SectorDescriptor* new_sector;
+
+  Entry entry;
+
+  // For simplicity use just the first copy. Any known bad copies should have
+  // been removed already.
+  // TODO: Add support to read other copies if needed.
+  TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+  TRY(entry.VerifyChecksumInFlash());
+
+  for (size_t i = metadata.addresses().size();
+       metadata.addresses().size() < redundancy();
+       i++) {
+    TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
+
+    const Address new_address = sectors_.NextWritableAddress(*new_sector);
+    const StatusWithSize result = entry.Copy(new_address);
+    TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+    if (options_.verify_on_write) {
+      TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+    }
+    // Entry was written successfully; update descriptor's address and the
+    // sector descriptors to reflect the new entry.
+    new_sector->RemoveWritableBytes(result.size());
+    new_sector->AddValidBytes(result.size());
+
+    metadata.AddNewAddress(new_address);
+  }
+  return Status::OK;
+}
+
+Status KeyValueStore::RepairCorruptSectors() {
+  // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
+  // sector failed on the first pass, then do a second pass, since a later
+  // sector might have cleared up space or otherwise unblocked the earlier
+  // failed sector.
+  Status repair_status = Status::OK;
+
+  size_t loop_count = 0;
+  do {
+    loop_count++;
+    // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
+    // Reset back to OK for the next pass.
+    if (repair_status == Status::RESOURCE_EXHAUSTED) {
+      repair_status = Status::OK;
+    }
+
+    DBG("   Pass %u", unsigned(loop_count));
+    for (SectorDescriptor& sector : sectors_) {
+      if (sector.corrupt()) {
+        DBG("   Found sector %u with corruption", sectors_.Index(sector));
+        Status sector_status = GarbageCollectSector(sector, {});
+        if (sector_status.ok()) {
+          error_stats_.corrupt_sectors_recovered += 1;
+        } else if (repair_status.ok() ||
+                   repair_status == Status::RESOURCE_EXHAUSTED) {
+          repair_status = sector_status;
+        }
+      }
+    }
+    DBG("   Pass %u complete", unsigned(loop_count));
+  } while (!repair_status.ok() && loop_count < 2);
+
+  return repair_status;
+}
+
+Status KeyValueStore::EnsureFreeSectorExists() {
+  Status repair_status = Status::OK;
+  bool empty_sector_found = false;
+
+  DBG("   Find empty sector");
+  for (SectorDescriptor& sector : sectors_) {
+    if (sector.Empty(partition_.sector_size_bytes())) {
+      empty_sector_found = true;
+      DBG("   Empty sector found");
+      break;
+    }
+  }
+  if (empty_sector_found == false) {
+    DBG("   No empty sector found, attempting to GC a free sector");
+    Status sector_status = GarbageCollect(span<const Address, 0>());
+    if (repair_status.ok() && !sector_status.ok()) {
+      DBG("   Unable to free an empty sector");
+      repair_status = sector_status;
+    }
+  }
+
+  return repair_status;
+}
+
+Status KeyValueStore::EnsureEntryRedundancy() {
+  Status repair_status = Status::OK;
+
+  if (redundancy() == 1) {
+    DBG("   Redundancy not in use, nothing to check");
+    return Status::OK;
+  }
+
+  DBG("   Write any needed additional duplicate copies of key to fulfill %u "
+      "redundancy",
+      unsigned(redundancy()));
+  for (const EntryMetadata& metadata : entry_cache_) {
+    if (metadata.addresses().size() >= redundancy()) {
+      continue;
+    }
+
+    DBG("   Key with %u of %u copies found, adding missing copies",
+        unsigned(metadata.addresses().size()),
+        unsigned(redundancy()));
+    // TODO: Add non-const iterator to the entry_cache_ and remove this
+    // const_cast.
+    Status fill_status =
+        AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+    if (fill_status.ok()) {
+      error_stats_.missing_redundant_entries_recovered += 1;
+      DBG("   Key missing copies added");
+    } else {
+      DBG("   Failed to add key missing copies");
+      if (repair_status.ok()) {
+        repair_status = fill_status;
+      }
+    }
+  }
+
+  return repair_status;
+}
+
+Status KeyValueStore::Repair() {
+  // Collect and return the first error encountered.
+  Status overall_status = Status::OK;
+
+  DBG("KVS repair");
+
+  // Step 1: Garbage collect any sectors marked as corrupt.
+  Status repair_status = RepairCorruptSectors();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  // Step 2: Make sure there is at least 1 empty sector. This needs to be a
+  // seperate check of sectors from step 1, because a found empty sector might
+  // get written to by a later GC that fails and does not result in a free
+  // sector.
+  repair_status = EnsureFreeSectorExists();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  // Step 3: Make sure each stored key has the full number of redundant
+  // entries.
+  repair_status = EnsureEntryRedundancy();
+  if (overall_status.ok()) {
+    overall_status = repair_status;
+  }
+
+  if (overall_status.ok()) {
+    error_detected_ = false;
+    initialized_ = InitializationState::kReady;
+  }
+  return overall_status;
+}
+
 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
                                                 string_view key,
                                                 span<const byte> value,
@@ -862,7 +1108,7 @@
 
 void KeyValueStore::LogKeyDescriptor() const {
   DBG("Key descriptors: count %zu", entry_cache_.total_entries());
-  for (auto& metadata : entry_cache_) {
+  for (const EntryMetadata& metadata : entry_cache_) {
     DBG("  - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
         metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
         static_cast<size_t>(metadata.hash()),