pw_kvs: Add initial parts self-repair from errors

Add self-repair/recovery when the KVS detects errors. This repair is two parts:
- Ensure that each stored key has the proper number of redundant entries.
- Garbage collect (erase and re-init) any sectors with corrupt bytes detected.

This change has error detection in KVS init() and uses a stubbed out Recover().
The actual repair process and error detection during normal operation (Get, Put,
GC) is WIP and not included in this change.

Change-Id: Id3398dd91a5665be8eb1a3aba57961cbbaf9411d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 091c244..50af414 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -56,14 +56,15 @@
       entry_cache_(key_descriptor_list, addresses, redundancy),
       sectors_(sector_descriptor_list),
       temp_sectors_to_skip_(temp_sectors_to_skip),
-      options_(options) {
-  initialized_ = false;
-  last_new_sector_ = nullptr;
-  last_transaction_id_ = 0;
-}
+      options_(options),
+      initialized_(false),
+      error_detected_(false),
+      last_new_sector_(nullptr),
+      last_transaction_id_(0) {}
 
 Status KeyValueStore::Init() {
   initialized_ = false;
+  error_detected_ = false;
   last_new_sector_ = nullptr;
   last_transaction_id_ = 0;
   entry_cache_.Reset();
@@ -79,6 +80,15 @@
 
   const size_t sector_size_bytes = partition_.sector_size_bytes();
 
+  // TODO: investigate doing this as a static assert/compile-time check.
+  if (sector_size_bytes > SectorDescriptor::max_sector_size()) {
+    ERR("KVS init failed: sector_size_bytes (=%zu) is greater than maximum "
+        "allowed sector size (=%zu)",
+        sector_size_bytes,
+        SectorDescriptor::max_sector_size());
+    return Status::FAILED_PRECONDITION;
+  }
+
   DBG("First pass: Read all entries from all sectors");
   Address sector_address = 0;
 
@@ -118,6 +128,7 @@
             SectorIndex(&sector),
             size_t(entry_address));
 
+        error_detected_ = true;
         corrupt_entries++;
 
         status = ScanForEntry(sector,
@@ -156,7 +167,8 @@
       // being written to it by indicating that it has no space. This should
       // also make it a decent GC candidate. Valid keys in the sector are still
       // readable as normal.
-      sector.set_writable_bytes(0);
+      sector.mark_corrupt();
+      error_detected_ = true;
 
       WRN("Sector %u contains %zuB of corrupt data",
           SectorIndex(&sector),
@@ -176,6 +188,9 @@
   // For every valid entry, count the valid bytes in that sector. Track which
   // entry has the newest transaction ID for initializing last_new_sector_.
   for (const EntryMetadata& metadata : entry_cache_) {
+    if (metadata.addresses().size() < redundancy()) {
+      error_detected_ = true;
+    }
     for (Address address : metadata.addresses()) {
       Entry entry;
       TRY(Entry::Read(partition_, address, formats_, &entry));
@@ -189,6 +204,15 @@
 
   last_new_sector_ = SectorFromAddress(newest_key);
 
+  if (error_detected_) {
+    Status recovery_status = Repair();
+    if (recovery_status.ok()) {
+      INF("KVS init: Corruption detected and fully repaired");
+    } else {
+      ERR("KVS init: Corruption detected and unable repair");
+    }
+  }
+
   if (!empty_sector_found) {
     // TODO: Record/report the error condition and recovery result.
     Status gc_result = GarbageCollectPartial();