pw_kvs: Add better error handling during read, write, and init

- When reading a key entry, use all of the redundant copies before
  failing.
- Add CopyEntryToSector helper that makes a copy of a key to another
  sector.
- Move FindExisting method and add wrapper for Find from EntryCache in
  mto KVS to allow the methods to report errors that are found.

Change-Id: Ia2da21e4ec3ad164b8620aa3949c524c0d916835
diff --git a/pw_kvs/entry_cache.cc b/pw_kvs/entry_cache.cc
index 6d80bb6..4528ab2 100644
--- a/pw_kvs/entry_cache.cc
+++ b/pw_kvs/entry_cache.cc
@@ -58,42 +58,62 @@
   addresses_ = addresses_.first(1);
 }
 
-Status EntryCache::Find(FlashPartition& partition,
-                        string_view key,
-                        EntryMetadata* metadata) const {
+StatusWithSize EntryCache::Find(FlashPartition& partition,
+                                const Sectors& sectors,
+                                const EntryFormats& formats,
+                                string_view key,
+                                EntryMetadata* metadata) const {
   const uint32_t hash = internal::Hash(key);
   Entry::KeyBuffer key_buffer;
+  bool error_detected = false;
 
   for (size_t i = 0; i < descriptors_.size(); ++i) {
     if (descriptors_[i].key_hash == hash) {
-      TRY(Entry::ReadKey(
-          partition, *first_address(i), key.size(), key_buffer.data()));
+      bool key_found = false;
+      string_view read_key;
 
-      if (key == string_view(key_buffer.data(), key.size())) {
+      for (Address address : addresses(i)) {
+        Status read_result =
+            Entry::ReadKey(partition, address, key.size(), key_buffer.data());
+
+        read_key = string_view(key_buffer.data(), key.size());
+
+        if (read_result.ok() && hash == internal::Hash(read_key)) {
+          key_found = true;
+          break;
+        } else {
+          // A hash mismatch can be caused by reading invalid data or a key hash
+          // collision of keys with differing size. To verify the data read from
+          // flash is good, validate the entry.
+          Entry entry;
+          read_result = Entry::Read(partition, address, formats, &entry);
+          if (read_result.ok() && entry.VerifyChecksumInFlash().ok()) {
+            key_found = true;
+            break;
+          }
+
+          PW_LOG_WARN(
+              "   Found corrupt entry, invalidating this copy of the key");
+          error_detected = true;
+          sectors.FromAddress(address).mark_corrupt();
+        }
+      }
+      size_t error_val = error_detected ? 1 : 0;
+
+      if (!key_found) {
+        PW_LOG_ERROR("No valid entries for key. Data has been lost!");
+        return StatusWithSize(Status::DATA_LOSS, error_val);
+      } else if (key == read_key) {
         PW_LOG_DEBUG("Found match for key hash 0x%08" PRIx32, hash);
         *metadata = EntryMetadata(descriptors_[i], addresses(i));
-        return Status::OK;
+        return StatusWithSize(Status::OK, error_val);
       } else {
         PW_LOG_WARN("Found key hash collision for 0x%08" PRIx32, hash);
-        return Status::ALREADY_EXISTS;
+        return StatusWithSize(Status::ALREADY_EXISTS, error_val);
       }
     }
   }
-  return Status::NOT_FOUND;
-}
-
-Status EntryCache::FindExisting(FlashPartition& partition,
-                                string_view key,
-                                EntryMetadata* metadata) const {
-  Status status = Find(partition, key, metadata);
-
-  // If the key's hash collides with an existing key or if the key is deleted,
-  // treat it as if it is not in the KVS.
-  if (status == Status::ALREADY_EXISTS ||
-      (status.ok() && metadata->state() == EntryState::kDeleted)) {
-    return Status::NOT_FOUND;
-  }
-  return status;
+  return StatusWithSize::NOT_FOUND;
 }
 
 EntryMetadata EntryCache::AddNew(const KeyDescriptor& descriptor,