pw_kvs: Handle key hash collisions
- Log and return ALREADY_EXISTS if a Put operation introduces a key
whose hash collides with another key already known to the KVS.
- Add tests to cover key hash collisions with Put, Get, ValueSize, and
Delete.
Change-Id: I957e7c4998f700517b70d3b999f0e3f4f4621f6d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index fd54654..85565b9 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -250,11 +250,7 @@
TRY_WITH_SIZE(CheckOperation(key));
const KeyDescriptor* key_descriptor;
- TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
- if (key_descriptor->deleted()) {
- return StatusWithSize(Status::NOT_FOUND);
- }
+ TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
Entry header;
TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -288,7 +284,9 @@
}
KeyDescriptor* key_descriptor;
- if (FindKeyDescriptor(key, &key_descriptor).ok()) {
+ Status status = FindKeyDescriptor(key, &key_descriptor);
+
+ if (status.ok()) {
DBG("Writing over existing entry for key 0x%08" PRIx32 " in sector %zu",
key_descriptor->key_hash,
SectorIndex(SectorFromAddress(key_descriptor->address)));
@@ -296,18 +294,18 @@
key_descriptor, KeyDescriptor::kValid, key, value);
}
- return WriteEntryForNewKey(key, value);
+ if (status == Status::NOT_FOUND) {
+ return WriteEntryForNewKey(key, value);
+ }
+
+ return status;
}
Status KeyValueStore::Delete(string_view key) {
TRY(CheckOperation(key));
KeyDescriptor* key_descriptor;
- TRY(FindKeyDescriptor(key, &key_descriptor));
-
- if (key_descriptor->deleted()) {
- return Status::NOT_FOUND;
- }
+ TRY(FindExistingKeyDescriptor(key, &key_descriptor));
DBG("Writing tombstone for existing key 0x%08" PRIx32 " in sector %zu",
key_descriptor->key_hash,
@@ -363,11 +361,7 @@
TRY_WITH_SIZE(CheckOperation(key));
const KeyDescriptor* key_descriptor;
- TRY_WITH_SIZE(FindKeyDescriptor(key, &key_descriptor));
-
- if (key_descriptor->deleted()) {
- return StatusWithSize(Status::NOT_FOUND);
- }
+ TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
Entry header;
TRY_WITH_SIZE(ReadEntryHeader(key_descriptor->address, &header));
@@ -413,6 +407,15 @@
return Status::OK;
}
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+// OK: there is a matching descriptor and *result is set
+// NOT_FOUND: there is no descriptor that matches this key, but this key
+// has a unique hash (and could potentially be added to the KVS)
+// ALREADY_EXISTS: there is no descriptor that matches this key, but the
+// key's hash collides with the hash for an existing descriptor
+//
Status KeyValueStore::FindKeyDescriptor(string_view key,
const KeyDescriptor** result) const {
char key_buffer[kMaxKeyLength];
@@ -426,12 +429,34 @@
DBG("Found match for key hash 0x%08" PRIx32, hash);
*result = &descriptor;
return Status::OK;
+ } else {
+ WRN("Found key hash collision for 0x%08" PRIx32, hash);
+ return Status::ALREADY_EXISTS;
}
}
}
return Status::NOT_FOUND;
}
+// Searches for a KeyDescriptor that matches this key and sets *result to point
+// to it if one is found.
+//
+// OK: there is a matching descriptor and *result is set
+// NOT_FOUND: there is no descriptor that matches this key
+//
+Status KeyValueStore::FindExistingKeyDescriptor(
+ string_view key, const KeyDescriptor** result) const {
+ Status status = FindKeyDescriptor(key, result);
+
+ // If the key's hash collides with an existing key or if the key is deleted,
+ // treat it as if it is not in the KVS.
+ if (status == Status::ALREADY_EXISTS ||
+ (status.ok() && (*result)->deleted())) {
+ return Status::NOT_FOUND;
+ }
+ return status;
+}
+
Status KeyValueStore::ReadEntryHeader(Address address, Entry* header) const {
return partition_.Read(address, sizeof(*header), header).status();
}
@@ -646,10 +671,7 @@
Status KeyValueStore::FindOrRecoverSectorWithSpace(SectorDescriptor** sector,
size_t size) {
Status result = FindSectorWithSpace(sector, size);
- if (result.ok()) {
- return result;
- }
- if (options_.partial_gc_on_write) {
+ if (result == Status::RESOURCE_EXHAUSTED && options_.partial_gc_on_write) {
// Garbage collect and then try again to find the best sector.
TRY(GarbageCollectOneSector());
return FindSectorWithSpace(sector, size);