pw_kvs: Transcation ID and key descriptor refactor
- Switch from per-key versions to a monotonically increasing KVS-global
transcation ID.
- Move KeyDescriptor and the hash function to their own files.
Change-Id: I05287137579d4fe2d72c6e176969d46006c2aae6
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 2fe9504..934b296 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -27,6 +27,7 @@
namespace pw::kvs {
namespace {
+using internal::Entry;
using std::byte;
using std::string_view;
@@ -45,8 +46,7 @@
entry_header_format_(format),
options_(options),
key_descriptors_(key_descriptor_list),
- sectors_(sector_descriptor_list),
- last_new_sector_(sectors_.data()) {}
+ sectors_(sector_descriptor_list) {}
Status KeyValueStore::Init() {
INF("Initializing key value store");
@@ -62,10 +62,6 @@
sectors_.resize(partition_.sector_count());
key_descriptors_.clear();
- // TODO: init last_new_sector_ to a random sector. Since the on-flash stored
- // information does not allow recovering the previous last_new_sector_ after
- // clean start, random is a good second choice.
-
const size_t sector_size_bytes = partition_.sector_size_bytes();
if (working_buffer_.size() < sector_size_bytes) {
@@ -132,12 +128,27 @@
}
DBG("Second pass: Count valid bytes in each sector");
+ const KeyDescriptor* newest_key = nullptr;
+ last_transaction_id_ = 0;
+
// For every valid key, increment the valid bytes for that sector.
for (KeyDescriptor& key_descriptor : key_descriptors_) {
Entry entry;
- TRY(Entry::Read(partition_, key_descriptor.address, &entry));
- SectorFromAddress(key_descriptor.address)->AddValidBytes(entry.size());
+ TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
+ SectorFromKey(key_descriptor)->AddValidBytes(entry.size());
+
+ if (key_descriptor.IsNewerThan(last_transaction_id_)) {
+ last_transaction_id_ = key_descriptor.transaction_id();
+ newest_key = &key_descriptor;
+ }
}
+
+ if (newest_key == nullptr) {
+ last_new_sector_ = sectors_.begin();
+ } else {
+ last_new_sector_ = SectorFromKey(newest_key);
+ }
+
initialized_ = true;
INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
@@ -171,18 +182,7 @@
const string_view key(key_buffer.data(), key_length);
TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
-
- KeyDescriptor key_descriptor(
- key,
- entry.key_version(),
- entry_address,
- entry.deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
-
- DBG("Key hash: %zx (%zu)",
- size_t(key_descriptor.key_hash),
- size_t(key_descriptor.key_hash));
-
- TRY(AppendNewOrOverwriteStaleExistingDescriptor(key_descriptor));
+ TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
*next_entry_address = entry.next_address();
return Status::OK;
@@ -196,7 +196,7 @@
const KeyDescriptor& key_descriptor) {
// With the new key descriptor, either add it to the descriptor table or
// overwrite an existing entry with an older version of the key.
- KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.key_hash);
+ KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.hash());
// Write a new entry.
if (existing_descriptor == nullptr) {
@@ -204,15 +204,18 @@
return Status::RESOURCE_EXHAUSTED;
}
key_descriptors_.push_back(key_descriptor);
- } else if (existing_descriptor->key_version < key_descriptor.key_version) {
+ } else if (key_descriptor.IsNewerThan(
+ existing_descriptor->transaction_id())) {
// Existing entry is old; replace the existing entry with the new one.
*existing_descriptor = key_descriptor;
} else {
- // Otherwise, check for data integrity and leave the existing entry.
- if (existing_descriptor->key_version == key_descriptor.key_version) {
- ERR("Data loss: Duplicated old(=%zu) and new(=%zu) version",
- size_t(existing_descriptor->key_version),
- size_t(key_descriptor.key_version));
+ // Otherwise, check if the entries have a duplicate transaction ID, which is
+ // not valid.
+ if (existing_descriptor->transaction_id() ==
+ key_descriptor.transaction_id()) {
+ ERR("Data loss: Duplicated old(=%zu) and new(=%zu) transaction ID",
+ size_t(existing_descriptor->transaction_id()),
+ size_t(key_descriptor.transaction_id()));
return Status::DATA_LOSS;
}
DBG("Found stale entry when appending; ignoring");
@@ -222,7 +225,7 @@
KeyValueStore::KeyDescriptor* KeyValueStore::FindDescriptor(uint32_t hash) {
for (KeyDescriptor& key_descriptor : key_descriptors_) {
- if (key_descriptor.key_hash == hash) {
+ if (key_descriptor.hash() == hash) {
return &key_descriptor;
}
}
@@ -238,7 +241,7 @@
TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
Entry entry;
- TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address, &entry));
+ TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address(), &entry));
StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -276,8 +279,8 @@
if (status.ok()) {
DBG("Overwriting entry for key %#08" PRIx32 " in sector %u",
- key_descriptor->key_hash,
- SectorIndex(SectorFromAddress(key_descriptor->address)));
+ key_descriptor->hash(),
+ SectorIndex(SectorFromKey(key_descriptor)));
return WriteEntryForExistingKey(
key_descriptor, KeyDescriptor::kValid, key, value);
}
@@ -296,8 +299,8 @@
TRY(FindExistingKeyDescriptor(key, &key_descriptor));
DBG("Writing tombstone for key %#08" PRIx32 " in sector %u",
- key_descriptor->key_hash,
- SectorIndex(SectorFromAddress(key_descriptor->address)));
+ key_descriptor->hash(),
+ SectorIndex(SectorFromKey(key_descriptor)));
return WriteEntryForExistingKey(
key_descriptor, KeyDescriptor::kDeleted, key, {});
}
@@ -314,7 +317,7 @@
std::memset(item_.key_buffer_.data(), 0, item_.key_buffer_.size());
Entry entry;
- if (Entry::Read(item_.kvs_.partition_, descriptor().address, &entry).ok()) {
+ if (Entry::Read(item_.kvs_.partition_, descriptor().address(), &entry).ok()) {
entry.ReadKey(item_.key_buffer_);
}
@@ -351,23 +354,11 @@
TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
Entry entry;
- TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address, &entry));
+ TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address(), &entry));
return StatusWithSize(entry.value_size());
}
-uint32_t KeyValueStore::HashKey(string_view string) {
- uint32_t hash = 0;
- uint32_t coefficient = 65599u;
-
- for (char ch : string) {
- hash += coefficient * unsigned(ch);
- coefficient *= 65599u;
- }
-
- return hash;
-}
-
Status KeyValueStore::FixedSizeGet(std::string_view key,
byte* value,
size_t size_bytes) const {
@@ -405,13 +396,13 @@
//
Status KeyValueStore::FindKeyDescriptor(string_view key,
const KeyDescriptor** result) const {
- const uint32_t hash = HashKey(key);
+ const uint32_t hash = internal::Hash(key);
Entry::KeyBuffer key_buffer;
for (auto& descriptor : key_descriptors_) {
- if (descriptor.key_hash == hash) {
+ if (descriptor.hash() == hash) {
TRY(Entry::ReadKey(
- partition_, descriptor.address, key.size(), key_buffer.data()));
+ partition_, descriptor.address(), key.size(), key_buffer.data()));
if (key == string_view(key_buffer.data(), key.size())) {
DBG("Found match for key hash 0x%08" PRIx32, hash);
@@ -451,8 +442,8 @@
span<const byte> value) {
// Find the original entry and sector to update the sector's valid_bytes.
Entry original_entry;
- TRY(Entry::Read(partition_, key_descriptor->address, &original_entry));
- SectorDescriptor* old_sector = SectorFromAddress(key_descriptor->address);
+ TRY(Entry::Read(partition_, key_descriptor->address(), &original_entry));
+ SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
SectorDescriptor* sector;
TRY(FindOrRecoverSectorWithSpace(§or,
@@ -461,13 +452,13 @@
SectorIndex(sector),
SectorBaseAddress(sector));
- if (old_sector != SectorFromAddress(key_descriptor->address)) {
+ if (old_sector != SectorFromKey(key_descriptor)) {
DBG("Sector for old entry (size %zu) was garbage collected. Old entry "
"relocated to sector %u",
original_entry.size(),
- SectorIndex(SectorFromAddress(key_descriptor->address)));
+ SectorIndex(SectorFromKey(key_descriptor)));
- old_sector = SectorFromAddress(key_descriptor->address);
+ old_sector = SectorFromKey(key_descriptor);
}
TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
@@ -484,14 +475,14 @@
return Status::RESOURCE_EXHAUSTED;
}
- // Create the KeyDescriptor that will be added to the list. The version and
- // address will be set by AppendEntry.
- KeyDescriptor key_descriptor(key, 0, 0);
-
SectorDescriptor* sector;
TRY(FindOrRecoverSectorWithSpace(§or,
Entry::size(partition_, key, value)));
DBG("Writing new entry; found sector: %u", SectorIndex(sector));
+
+ // Create the KeyDescriptor that will be added to the list. The transaction ID
+ // and address will be set by AppendEntry.
+ KeyDescriptor key_descriptor(key);
TRY(AppendEntry(sector, &key_descriptor, key, value, KeyDescriptor::kValid));
// Only add the entry when we are certain the write succeeded.
@@ -512,7 +503,7 @@
// store the key and value in the TempEntry stored in the static allocated
// working_buffer_.
Entry entry;
- TRY(Entry::Read(partition_, key_descriptor.address, &entry));
+ TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
TRY_ASSIGN(size_t key_length, entry.ReadKey(temp_entry->key));
string_view key = string_view(temp_entry->key.data(), key_length);
auto result = entry.ReadValue(as_writable_bytes(span(temp_entry->value)));
@@ -524,13 +515,16 @@
TRY(entry.VerifyChecksum(
entry_header_format_.checksum, key, as_bytes(value)));
- SectorDescriptor* old_sector = SectorFromAddress(key_descriptor.address);
+ SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
// Find a new sector for the entry and write it to the new location.
SectorDescriptor* new_sector;
TRY(FindSectorWithSpace(&new_sector, entry.size(), old_sector, true));
- TRY(AppendEntry(
- new_sector, &key_descriptor, key, as_bytes(value), key_descriptor.state));
+ TRY(AppendEntry(new_sector,
+ &key_descriptor,
+ key,
+ as_bytes(value),
+ key_descriptor.state()));
// Do the valid bytes accounting for the sector the entry was relocated out
// of.
@@ -677,7 +671,7 @@
// Step 2: Move any valid entries in the GC sector to other sectors
if (sector_to_gc->valid_bytes() != 0) {
for (auto& descriptor : key_descriptors_) {
- if (AddressInSector(*sector_to_gc, descriptor.address)) {
+ if (AddressInSector(*sector_to_gc, descriptor.address())) {
DBG(" Relocate entry");
TRY(RelocateEntry(descriptor));
}
@@ -703,36 +697,16 @@
Status KeyValueStore::AppendEntry(SectorDescriptor* sector,
KeyDescriptor* key_descriptor,
- const string_view key,
+ string_view key,
span<const byte> value,
KeyDescriptor::State new_state) {
const Address address = NextWritableAddress(sector);
- DBG("Appending to address: %#zx", size_t(address));
+ Entry entry = CreateEntry(address, key, value, new_state);
- Entry entry;
-
- if (new_state == KeyDescriptor::kDeleted) {
- entry = Entry::Tombstone(partition_,
- address,
- entry_header_format_.magic,
- entry_header_format_.checksum,
- key,
- partition_.alignment_bytes(),
- key_descriptor->key_version + 1);
- } else {
- entry = Entry::Valid(partition_,
- address,
- entry_header_format_.magic,
- entry_header_format_.checksum,
- key,
- value,
- partition_.alignment_bytes(),
- key_descriptor->key_version + 1);
- }
-
- DBG("Appending %zu B entry with key version: %x",
+ DBG("Appending %zu B entry with transaction ID %" PRIu32 " to address %#zx",
entry.size(),
- unsigned(entry.key_version()));
+ entry.transaction_id(),
+ size_t(address));
TRY_ASSIGN(const size_t written, entry.Write(key, value));
@@ -740,14 +714,36 @@
TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
}
- key_descriptor->address = address;
- key_descriptor->key_version = entry.key_version();
- key_descriptor->state = new_state;
-
+ entry.UpdateDescriptor(key_descriptor);
sector->MarkValidBytesWritten(written);
return Status::OK;
}
+Entry KeyValueStore::CreateEntry(Address address,
+ std::string_view key,
+ span<const byte> value,
+ KeyDescriptor::State state) {
+ const uint32_t transaction_id = ++last_transaction_id_;
+
+ if (state == KeyDescriptor::kDeleted) {
+ return Entry::Tombstone(partition_,
+ address,
+ entry_header_format_.magic,
+ entry_header_format_.checksum,
+ key,
+ partition_.alignment_bytes(),
+ transaction_id);
+ }
+ return Entry::Valid(partition_,
+ address,
+ entry_header_format_.magic,
+ entry_header_format_.checksum,
+ key,
+ value,
+ partition_.alignment_bytes(),
+ transaction_id);
+}
+
void KeyValueStore::LogDebugInfo() {
const size_t sector_size_bytes = partition_.sector_size_bytes();
DBG("====================== KEY VALUE STORE DUMP =========================");
@@ -769,10 +765,10 @@
const KeyDescriptor& kd = key_descriptors_[i];
DBG(" |%3zu: | %8zx |%8zu | %8zu | %8zx",
i,
- size_t(kd.key_hash),
- size_t(kd.key_version),
- size_t(kd.address),
- size_t(kd.address));
+ size_t(kd.hash()),
+ size_t(kd.transaction_id()),
+ size_t(kd.address()),
+ size_t(kd.address()));
}
DBG(" ");
@@ -838,11 +834,11 @@
void KeyValueStore::LogKeyDescriptor() const {
DBG("Key descriptors: count %zu", key_descriptors_.size());
for (auto& key : key_descriptors_) {
- DBG(" - Key: %s, hash %#zx, version %zu, address %#zx",
+ DBG(" - Key: %s, hash %#zx, transaction ID %zu, address %#zx",
key.deleted() ? "Deleted" : "Valid",
- static_cast<size_t>(key.key_hash),
- static_cast<size_t>(key.key_version),
- static_cast<size_t>(key.address));
+ static_cast<size_t>(key.hash()),
+ static_cast<size_t>(key.transaction_id()),
+ static_cast<size_t>(key.address()));
}
}