pw_kvs: Transcation ID and key descriptor refactor

- Switch from per-key versions to a monotonically increasing KVS-global
  transcation ID.
- Move KeyDescriptor and the hash function to their own files.

Change-Id: I05287137579d4fe2d72c6e176969d46006c2aae6
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index 2fe9504..934b296 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -27,6 +27,7 @@
 namespace pw::kvs {
 namespace {
 
+using internal::Entry;
 using std::byte;
 using std::string_view;
 
@@ -45,8 +46,7 @@
       entry_header_format_(format),
       options_(options),
       key_descriptors_(key_descriptor_list),
-      sectors_(sector_descriptor_list),
-      last_new_sector_(sectors_.data()) {}
+      sectors_(sector_descriptor_list) {}
 
 Status KeyValueStore::Init() {
   INF("Initializing key value store");
@@ -62,10 +62,6 @@
   sectors_.resize(partition_.sector_count());
   key_descriptors_.clear();
 
-  // TODO: init last_new_sector_ to a random sector. Since the on-flash stored
-  // information does not allow recovering the previous last_new_sector_ after
-  // clean start, random is a good second choice.
-
   const size_t sector_size_bytes = partition_.sector_size_bytes();
 
   if (working_buffer_.size() < sector_size_bytes) {
@@ -132,12 +128,27 @@
   }
 
   DBG("Second pass: Count valid bytes in each sector");
+  const KeyDescriptor* newest_key = nullptr;
+  last_transaction_id_ = 0;
+
   // For every valid key, increment the valid bytes for that sector.
   for (KeyDescriptor& key_descriptor : key_descriptors_) {
     Entry entry;
-    TRY(Entry::Read(partition_, key_descriptor.address, &entry));
-    SectorFromAddress(key_descriptor.address)->AddValidBytes(entry.size());
+    TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
+    SectorFromKey(key_descriptor)->AddValidBytes(entry.size());
+
+    if (key_descriptor.IsNewerThan(last_transaction_id_)) {
+      last_transaction_id_ = key_descriptor.transaction_id();
+      newest_key = &key_descriptor;
+    }
   }
+
+  if (newest_key == nullptr) {
+    last_new_sector_ = sectors_.begin();
+  } else {
+    last_new_sector_ = SectorFromKey(newest_key);
+  }
+
   initialized_ = true;
 
   INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
@@ -171,18 +182,7 @@
   const string_view key(key_buffer.data(), key_length);
 
   TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
-
-  KeyDescriptor key_descriptor(
-      key,
-      entry.key_version(),
-      entry_address,
-      entry.deleted() ? KeyDescriptor::kDeleted : KeyDescriptor::kValid);
-
-  DBG("Key hash: %zx (%zu)",
-      size_t(key_descriptor.key_hash),
-      size_t(key_descriptor.key_hash));
-
-  TRY(AppendNewOrOverwriteStaleExistingDescriptor(key_descriptor));
+  TRY(AppendNewOrOverwriteStaleExistingDescriptor(entry.descriptor(key)));
 
   *next_entry_address = entry.next_address();
   return Status::OK;
@@ -196,7 +196,7 @@
     const KeyDescriptor& key_descriptor) {
   // With the new key descriptor, either add it to the descriptor table or
   // overwrite an existing entry with an older version of the key.
-  KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.key_hash);
+  KeyDescriptor* existing_descriptor = FindDescriptor(key_descriptor.hash());
 
   // Write a new entry.
   if (existing_descriptor == nullptr) {
@@ -204,15 +204,18 @@
       return Status::RESOURCE_EXHAUSTED;
     }
     key_descriptors_.push_back(key_descriptor);
-  } else if (existing_descriptor->key_version < key_descriptor.key_version) {
+  } else if (key_descriptor.IsNewerThan(
+                 existing_descriptor->transaction_id())) {
     // Existing entry is old; replace the existing entry with the new one.
     *existing_descriptor = key_descriptor;
   } else {
-    // Otherwise, check for data integrity and leave the existing entry.
-    if (existing_descriptor->key_version == key_descriptor.key_version) {
-      ERR("Data loss: Duplicated old(=%zu) and new(=%zu) version",
-          size_t(existing_descriptor->key_version),
-          size_t(key_descriptor.key_version));
+    // Otherwise, check if the entries have a duplicate transaction ID, which is
+    // not valid.
+    if (existing_descriptor->transaction_id() ==
+        key_descriptor.transaction_id()) {
+      ERR("Data loss: Duplicated old(=%zu) and new(=%zu) transaction ID",
+          size_t(existing_descriptor->transaction_id()),
+          size_t(key_descriptor.transaction_id()));
       return Status::DATA_LOSS;
     }
     DBG("Found stale entry when appending; ignoring");
@@ -222,7 +225,7 @@
 
 KeyValueStore::KeyDescriptor* KeyValueStore::FindDescriptor(uint32_t hash) {
   for (KeyDescriptor& key_descriptor : key_descriptors_) {
-    if (key_descriptor.key_hash == hash) {
+    if (key_descriptor.hash() == hash) {
       return &key_descriptor;
     }
   }
@@ -238,7 +241,7 @@
   TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry entry;
-  TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address, &entry));
+  TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address(), &entry));
 
   StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
   if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
@@ -276,8 +279,8 @@
 
   if (status.ok()) {
     DBG("Overwriting entry for key %#08" PRIx32 " in sector %u",
-        key_descriptor->key_hash,
-        SectorIndex(SectorFromAddress(key_descriptor->address)));
+        key_descriptor->hash(),
+        SectorIndex(SectorFromKey(key_descriptor)));
     return WriteEntryForExistingKey(
         key_descriptor, KeyDescriptor::kValid, key, value);
   }
@@ -296,8 +299,8 @@
   TRY(FindExistingKeyDescriptor(key, &key_descriptor));
 
   DBG("Writing tombstone for key %#08" PRIx32 " in sector %u",
-      key_descriptor->key_hash,
-      SectorIndex(SectorFromAddress(key_descriptor->address)));
+      key_descriptor->hash(),
+      SectorIndex(SectorFromKey(key_descriptor)));
   return WriteEntryForExistingKey(
       key_descriptor, KeyDescriptor::kDeleted, key, {});
 }
@@ -314,7 +317,7 @@
   std::memset(item_.key_buffer_.data(), 0, item_.key_buffer_.size());
 
   Entry entry;
-  if (Entry::Read(item_.kvs_.partition_, descriptor().address, &entry).ok()) {
+  if (Entry::Read(item_.kvs_.partition_, descriptor().address(), &entry).ok()) {
     entry.ReadKey(item_.key_buffer_);
   }
 
@@ -351,23 +354,11 @@
   TRY_WITH_SIZE(FindExistingKeyDescriptor(key, &key_descriptor));
 
   Entry entry;
-  TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address, &entry));
+  TRY_WITH_SIZE(Entry::Read(partition_, key_descriptor->address(), &entry));
 
   return StatusWithSize(entry.value_size());
 }
 
-uint32_t KeyValueStore::HashKey(string_view string) {
-  uint32_t hash = 0;
-  uint32_t coefficient = 65599u;
-
-  for (char ch : string) {
-    hash += coefficient * unsigned(ch);
-    coefficient *= 65599u;
-  }
-
-  return hash;
-}
-
 Status KeyValueStore::FixedSizeGet(std::string_view key,
                                    byte* value,
                                    size_t size_bytes) const {
@@ -405,13 +396,13 @@
 //
 Status KeyValueStore::FindKeyDescriptor(string_view key,
                                         const KeyDescriptor** result) const {
-  const uint32_t hash = HashKey(key);
+  const uint32_t hash = internal::Hash(key);
   Entry::KeyBuffer key_buffer;
 
   for (auto& descriptor : key_descriptors_) {
-    if (descriptor.key_hash == hash) {
+    if (descriptor.hash() == hash) {
       TRY(Entry::ReadKey(
-          partition_, descriptor.address, key.size(), key_buffer.data()));
+          partition_, descriptor.address(), key.size(), key_buffer.data()));
 
       if (key == string_view(key_buffer.data(), key.size())) {
         DBG("Found match for key hash 0x%08" PRIx32, hash);
@@ -451,8 +442,8 @@
                                                span<const byte> value) {
   // Find the original entry and sector to update the sector's valid_bytes.
   Entry original_entry;
-  TRY(Entry::Read(partition_, key_descriptor->address, &original_entry));
-  SectorDescriptor* old_sector = SectorFromAddress(key_descriptor->address);
+  TRY(Entry::Read(partition_, key_descriptor->address(), &original_entry));
+  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
 
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector,
@@ -461,13 +452,13 @@
       SectorIndex(sector),
       SectorBaseAddress(sector));
 
-  if (old_sector != SectorFromAddress(key_descriptor->address)) {
+  if (old_sector != SectorFromKey(key_descriptor)) {
     DBG("Sector for old entry (size %zu) was garbage collected. Old entry "
         "relocated to sector %u",
         original_entry.size(),
-        SectorIndex(SectorFromAddress(key_descriptor->address)));
+        SectorIndex(SectorFromKey(key_descriptor)));
 
-    old_sector = SectorFromAddress(key_descriptor->address);
+    old_sector = SectorFromKey(key_descriptor);
   }
 
   TRY(AppendEntry(sector, key_descriptor, key, value, new_state));
@@ -484,14 +475,14 @@
     return Status::RESOURCE_EXHAUSTED;
   }
 
-  // Create the KeyDescriptor that will be added to the list. The version and
-  // address will be set by AppendEntry.
-  KeyDescriptor key_descriptor(key, 0, 0);
-
   SectorDescriptor* sector;
   TRY(FindOrRecoverSectorWithSpace(&sector,
                                    Entry::size(partition_, key, value)));
   DBG("Writing new entry; found sector: %u", SectorIndex(sector));
+
+  // Create the KeyDescriptor that will be added to the list. The transaction ID
+  // and address will be set by AppendEntry.
+  KeyDescriptor key_descriptor(key);
   TRY(AppendEntry(sector, &key_descriptor, key, value, KeyDescriptor::kValid));
 
   // Only add the entry when we are certain the write succeeded.
@@ -512,7 +503,7 @@
   // store the key and value in the TempEntry stored in the static allocated
   // working_buffer_.
   Entry entry;
-  TRY(Entry::Read(partition_, key_descriptor.address, &entry));
+  TRY(Entry::Read(partition_, key_descriptor.address(), &entry));
   TRY_ASSIGN(size_t key_length, entry.ReadKey(temp_entry->key));
   string_view key = string_view(temp_entry->key.data(), key_length);
   auto result = entry.ReadValue(as_writable_bytes(span(temp_entry->value)));
@@ -524,13 +515,16 @@
   TRY(entry.VerifyChecksum(
       entry_header_format_.checksum, key, as_bytes(value)));
 
-  SectorDescriptor* old_sector = SectorFromAddress(key_descriptor.address);
+  SectorDescriptor* old_sector = SectorFromKey(key_descriptor);
 
   // Find a new sector for the entry and write it to the new location.
   SectorDescriptor* new_sector;
   TRY(FindSectorWithSpace(&new_sector, entry.size(), old_sector, true));
-  TRY(AppendEntry(
-      new_sector, &key_descriptor, key, as_bytes(value), key_descriptor.state));
+  TRY(AppendEntry(new_sector,
+                  &key_descriptor,
+                  key,
+                  as_bytes(value),
+                  key_descriptor.state()));
 
   // Do the valid bytes accounting for the sector the entry was relocated out
   // of.
@@ -677,7 +671,7 @@
   // Step 2: Move any valid entries in the GC sector to other sectors
   if (sector_to_gc->valid_bytes() != 0) {
     for (auto& descriptor : key_descriptors_) {
-      if (AddressInSector(*sector_to_gc, descriptor.address)) {
+      if (AddressInSector(*sector_to_gc, descriptor.address())) {
         DBG("  Relocate entry");
         TRY(RelocateEntry(descriptor));
       }
@@ -703,36 +697,16 @@
 
 Status KeyValueStore::AppendEntry(SectorDescriptor* sector,
                                   KeyDescriptor* key_descriptor,
-                                  const string_view key,
+                                  string_view key,
                                   span<const byte> value,
                                   KeyDescriptor::State new_state) {
   const Address address = NextWritableAddress(sector);
-  DBG("Appending to address: %#zx", size_t(address));
+  Entry entry = CreateEntry(address, key, value, new_state);
 
-  Entry entry;
-
-  if (new_state == KeyDescriptor::kDeleted) {
-    entry = Entry::Tombstone(partition_,
-                             address,
-                             entry_header_format_.magic,
-                             entry_header_format_.checksum,
-                             key,
-                             partition_.alignment_bytes(),
-                             key_descriptor->key_version + 1);
-  } else {
-    entry = Entry::Valid(partition_,
-                         address,
-                         entry_header_format_.magic,
-                         entry_header_format_.checksum,
-                         key,
-                         value,
-                         partition_.alignment_bytes(),
-                         key_descriptor->key_version + 1);
-  }
-
-  DBG("Appending %zu B entry with key version: %x",
+  DBG("Appending %zu B entry with transaction ID %" PRIu32 " to address %#zx",
       entry.size(),
-      unsigned(entry.key_version()));
+      entry.transaction_id(),
+      size_t(address));
 
   TRY_ASSIGN(const size_t written, entry.Write(key, value));
 
@@ -740,14 +714,36 @@
     TRY(entry.VerifyChecksumInFlash(entry_header_format_.checksum));
   }
 
-  key_descriptor->address = address;
-  key_descriptor->key_version = entry.key_version();
-  key_descriptor->state = new_state;
-
+  entry.UpdateDescriptor(key_descriptor);
   sector->MarkValidBytesWritten(written);
   return Status::OK;
 }
 
+Entry KeyValueStore::CreateEntry(Address address,
+                                 std::string_view key,
+                                 span<const byte> value,
+                                 KeyDescriptor::State state) {
+  const uint32_t transaction_id = ++last_transaction_id_;
+
+  if (state == KeyDescriptor::kDeleted) {
+    return Entry::Tombstone(partition_,
+                            address,
+                            entry_header_format_.magic,
+                            entry_header_format_.checksum,
+                            key,
+                            partition_.alignment_bytes(),
+                            transaction_id);
+  }
+  return Entry::Valid(partition_,
+                      address,
+                      entry_header_format_.magic,
+                      entry_header_format_.checksum,
+                      key,
+                      value,
+                      partition_.alignment_bytes(),
+                      transaction_id);
+}
+
 void KeyValueStore::LogDebugInfo() {
   const size_t sector_size_bytes = partition_.sector_size_bytes();
   DBG("====================== KEY VALUE STORE DUMP =========================");
@@ -769,10 +765,10 @@
     const KeyDescriptor& kd = key_descriptors_[i];
     DBG("   |%3zu: | %8zx  |%8zu  | %8zu | %8zx",
         i,
-        size_t(kd.key_hash),
-        size_t(kd.key_version),
-        size_t(kd.address),
-        size_t(kd.address));
+        size_t(kd.hash()),
+        size_t(kd.transaction_id()),
+        size_t(kd.address()),
+        size_t(kd.address()));
   }
   DBG(" ");
 
@@ -838,11 +834,11 @@
 void KeyValueStore::LogKeyDescriptor() const {
   DBG("Key descriptors: count %zu", key_descriptors_.size());
   for (auto& key : key_descriptors_) {
-    DBG("  - Key: %s, hash %#zx, version %zu, address %#zx",
+    DBG("  - Key: %s, hash %#zx, transaction ID %zu, address %#zx",
         key.deleted() ? "Deleted" : "Valid",
-        static_cast<size_t>(key.key_hash),
-        static_cast<size_t>(key.key_version),
-        static_cast<size_t>(key.address));
+        static_cast<size_t>(key.hash()),
+        static_cast<size_t>(key.transaction_id()),
+        static_cast<size_t>(key.address()));
   }
 }