pw_kvs: Handle key hash collisions

- Log and return ALREADY_EXISTS if a Put operation introduces a key
  whose hash collides with another key already known to the KVS.
- Add tests to cover key hash collisions with Put, Get, ValueSize, and
  Delete.

Change-Id: I957e7c4998f700517b70d3b999f0e3f4f4621f6d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index fd54654..85565b9 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -250,11 +250,7 @@
   TRY_WITH_SIZE(CheckOperation(key));
 
   const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return StatusWithSize(Status::NOT_FOUND);
-  }
+  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry header;
   TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -288,7 +284,9 @@
   }
 
   KeyDescriptor* key_descriptor;
-  if (FindKeyDescriptor(key, &key_descriptor).ok()) {
+  Status status = FindKeyDescriptor(key, &key_descriptor);
+
+  if (status.ok()) {
     DBG("Writing over existing entry for key 0x%08" PRIx32 " in sector %zu",
         key_descriptor->key_hash,
         SectorIndex(SectorFromAddress(key_descriptor->address)));
@@ -296,18 +294,18 @@
         key_descriptor, KeyDescriptor::kValid, key, value);
   }
 
-  return WriteEntryForNewKey(key, value);
+  if (status == Status::NOT_FOUND) {
+    return WriteEntryForNewKey(key, value);
+  }
+
+  return status;
 }
 
 Status KeyValueStore::Delete(string_view key) {
   TRY(CheckOperation(key));
 
   KeyDescriptor* key_descriptor;
-  TRY(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return Status::NOT_FOUND;
-  }
+  TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
   DBG("Writing tombstone for existing key 0x%08" PRIx32 " in sector %zu",
       key_descriptor->key_hash,
@@ -363,11 +361,7 @@
   TRY_WITH_SIZE(CheckOperation(key));
 
   const KeyDescriptor* key_descriptor;
-  TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
-  if (key_descriptor->deleted()) {
-    return StatusWithSize(Status::NOT_FOUND);
-  }
+  TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry header;
   TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -413,6 +407,15 @@
   return Status::OK;
 }
 
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+//             OK: there is a matching descriptor and *result is set
+//      NOT_FOUND: there is no descriptor that matches this key, but this key
+//                 has a unique hash (and could potentially be added to the KVS)
+// ALREADY_EXISTS: there is no descriptor that matches this key, but the
+//                 key's hash collides with the hash for an existing descriptor
+//
 Status KeyValueStore::FindKeyDescriptor(string_view key,
                                         const KeyDescriptor** result) const {
   char key_buffer[kMaxKeyLength];
@@ -426,12 +429,34 @@
         DBG("Found match for key hash 0x%08" PRIx32, hash);
         *result = &descriptor;
         return Status::OK;
+      } else {
+        WRN("Found key hash collision for 0x%08" PRIx32, hash);
+        return Status::ALREADY_EXISTS;
       }
     }
   }
   return Status::NOT_FOUND;
 }
 
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+//          OK: there is a matching descriptor and *result is set
+//   NOT_FOUND: there is no descriptor that matches this key
+//
+Status KeyValueStore::FindExistingKeyDescriptor(
+    string_view key, const KeyDescriptor** result) const {
+  Status status = FindKeyDescriptor(key, result);
+
+  // If the key's hash collides with an existing key or if the key is deleted,
+  // treat it as if it is not in the KVS.
+  if (status == Status::ALREADY_EXISTS ||
+      (status.ok() && (*result)->deleted())) {
+    return Status::NOT_FOUND;
+  }
+  return status;
+}
+
 Status KeyValueStore::ReadEntryHeader(Address address, Entry* header) const {
   return partition_.Read(address, sizeof(*header), header).status();
 }
@@ -646,10 +671,7 @@
 Status KeyValueStore::FindOrRecoverSectorWithSpace(SectorDescriptor** sector,
                                                    size_t size) {
   Status result = FindSectorWithSpace(sector, size);
-  if (result.ok()) {
-    return result;
-  }
-  if (options_.partial_gc_on_write) {
+  if (result == Status::RESOURCE_EXHAUSTED && options_.partial_gc_on_write) {
     // Garbage collect and then try again to find the best sector.
     TRY(GarbageCollectOneSector());
     return FindSectorWithSpace(sector, size);