pw_kvs: Add KVS error repair/recovery
- Add repair of detected errors in the KVS.
- Add additional error detection in KVS init
- KVS initialization has a new "needs maintenance" state. This is a
read-only mode because not all of the required KVS system invariants
have been met.
Change-Id: If13094855d8da03d59011891c8ed5af2ad38ad2d
diff --git a/pw_kvs/key_value_store.cc b/pw_kvs/key_value_store.cc
index d0ab512..9fa7199 100644
--- a/pw_kvs/key_value_store.cc
+++ b/pw_kvs/key_value_store.cc
@@ -48,13 +48,15 @@
sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
entry_cache_(key_descriptor_list, addresses, redundancy),
options_(options),
- initialized_(false),
+ initialized_(InitializationState::kNotInitialized),
error_detected_(false),
+ error_stats_({}),
last_transaction_id_(0) {}
Status KeyValueStore::Init() {
- initialized_ = false;
+ initialized_ = InitializationState::kNotInitialized;
error_detected_ = false;
+ error_stats_ = {};
last_transaction_id_ = 0;
sectors_.Reset();
entry_cache_.Reset();
@@ -191,26 +193,34 @@
sectors_.set_last_new_sector(newest_key);
- if (error_detected_) {
- Status recovery_status = Repair();
- if (recovery_status.ok()) {
- INF("KVS init: Corruption detected and fully repaired");
- } else {
- ERR("KVS init: Corruption detected and unable repair");
- }
- }
-
if (!empty_sector_found) {
- // TODO: Record/report the error condition and recovery result.
- Status gc_result = GarbageCollectPartial();
-
- if (!gc_result.ok()) {
- ERR("KVS init failed: Unable to maintain required free sector");
- return Status::INTERNAL;
- }
+ error_detected_ = true;
}
- initialized_ = true;
+ if (!error_detected_) {
+ initialized_ = InitializationState::kReady;
+ } else {
+ if (options_.recovery != ErrorRecovery::kManual) {
+ WRN("KVS init: Corruption detected, beginning repair");
+ Status recovery_status = Repair();
+
+ if (recovery_status.ok()) {
+ WRN("KVS init: Corruption detected and fully repaired");
+ initialized_ = InitializationState::kReady;
+ total_corrupt_bytes = 0;
+ corrupt_entries = 0;
+ } else if (recovery_status == Status::RESOURCE_EXHAUSTED) {
+ WRN("KVS init: Unable to maintain required free sector");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ } else {
+ WRN("KVS init: Corruption detected and unable repair");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ }
+ } else {
+ WRN("KVS init: Corruption detected, no repair attempted due to options");
+ initialized_ = InitializationState::kNeedsMaintenance;
+ }
+ }
INF("KeyValueStore init complete: active keys %zu, deleted keys %zu, sectors "
"%zu, logical sector size %zu bytes",
@@ -219,6 +229,7 @@
sectors_.size(),
partition_.sector_size_bytes());
+ // Report any corruption was not repaired.
if (total_corrupt_bytes > 0) {
WRN("Found %zu corrupt bytes and %d corrupt entries during init process; "
"some keys may be missing",
@@ -231,9 +242,12 @@
}
KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
- StorageStats stats{0, 0, 0};
+ StorageStats stats{};
const size_t sector_size = partition_.sector_size_bytes();
bool found_empty_sector = false;
+ stats.corrupt_sectors_recovered = error_stats_.corrupt_sectors_recovered;
+ stats.missing_redundant_entries_recovered =
+ error_stats_.missing_redundant_entries_recovered;
for (const SectorDescriptor& sector : sectors_) {
stats.in_use_bytes += sector.valid_bytes();
@@ -255,6 +269,28 @@
return stats;
}
+bool KeyValueStore::CheckForErrors() {
+ // Check for corrupted sectors
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.corrupt()) {
+ error_detected_ = true;
+ break;
+ }
+ }
+
+ // Check for missing redundancy.
+ if (redundancy() > 1) {
+ for (const EntryMetadata& metadata : entry_cache_) {
+ if (metadata.addresses().size() < redundancy()) {
+ error_detected_ = true;
+ break;
+ }
+ }
+ }
+
+ return error_detected();
+}
+
Status KeyValueStore::LoadEntry(Address entry_address,
Address* next_entry_address) {
Entry entry;
@@ -304,7 +340,7 @@
StatusWithSize KeyValueStore::Get(string_view key,
span<byte> value_buffer,
size_t offset_bytes) const {
- TRY_WITH_SIZE(CheckOperation(key));
+ TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -313,12 +349,11 @@
}
Status KeyValueStore::PutBytes(string_view key, span<const byte> value) {
+ TRY(CheckWriteOperation(key));
DBG("Writing key/value; key length=%zu, value length=%zu",
key.size(),
value.size());
- TRY(CheckOperation(key));
-
if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
DBG("%zu B value with %zu B key cannot fit in one sector",
value.size(),
@@ -346,7 +381,7 @@
}
Status KeyValueStore::Delete(string_view key) {
- TRY(CheckOperation(key));
+ TRY(CheckWriteOperation(key));
EntryMetadata metadata;
TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -391,7 +426,7 @@
}
StatusWithSize KeyValueStore::ValueSize(string_view key) const {
- TRY_WITH_SIZE(CheckOperation(key));
+ TRY_WITH_SIZE(CheckReadOperation(key));
EntryMetadata metadata;
TRY_WITH_SIZE(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -426,7 +461,7 @@
Status KeyValueStore::FixedSizeGet(std::string_view key,
void* value,
size_t size_bytes) const {
- TRY(CheckOperation(key));
+ TRY(CheckWriteOperation(key));
EntryMetadata metadata;
TRY(entry_cache_.FindExisting(partition_, key, &metadata));
@@ -463,16 +498,31 @@
return StatusWithSize(entry.value_size());
}
-Status KeyValueStore::CheckOperation(string_view key) const {
+Status KeyValueStore::CheckWriteOperation(string_view key) const {
if (InvalidKey(key)) {
return Status::INVALID_ARGUMENT;
}
+
+ // For normal write operation the KVS must be fully ready.
if (!initialized()) {
return Status::FAILED_PRECONDITION;
}
return Status::OK;
}
+Status KeyValueStore::CheckReadOperation(string_view key) const {
+ if (InvalidKey(key)) {
+ return Status::INVALID_ARGUMENT;
+ }
+
+ // Operations that are explicitly read-only can be done after init() has been
+ // called but not fully ready (when needing maintenance).
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+ return Status::OK;
+}
+
Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
EntryState new_state,
string_view key,
@@ -575,7 +625,7 @@
do_auto_gc = false;
}
// Garbage collect and then try again to find the best sector.
- Status gc_status = GarbageCollectPartial(reserved);
+ Status gc_status = GarbageCollect(reserved);
if (!gc_status.ok()) {
if (gc_status == Status::NOT_FOUND) {
// Not enough space, and no reclaimable bytes, this KVS is full!
@@ -603,6 +653,16 @@
return result;
}
+Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
+ SectorDescriptor* sector) {
+ if (!status.ok()) {
+ DBG(" Sector %u corrupt", sectors_.Index(sector));
+ sector->mark_corrupt();
+ error_detected_ = true;
+ }
+ return status;
+}
+
Status KeyValueStore::AppendEntry(const Entry& entry,
string_view key,
span<const byte> value) {
@@ -618,11 +678,11 @@
entry.size(),
size_t(entry.address()),
result.size());
- return result.status();
+ TRY(MarkSectorCorruptIfNotOk(result.status(), §or));
}
if (options_.verify_on_write) {
- TRY(entry.VerifyChecksumInFlash());
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), §or));
}
sector.AddValidBytes(result.size());
@@ -648,20 +708,32 @@
const Address new_address = sectors_.NextWritableAddress(*new_sector);
const StatusWithSize result = entry.Copy(new_address);
- new_sector->RemoveWritableBytes(result.size());
- TRY(result);
+ TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+ if (options_.verify_on_write) {
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+ }
// Entry was written successfully; update descriptor's address and the sector
// descriptors to reflect the new entry.
- sectors_.FromAddress(address).RemoveValidBytes(result.size());
+ new_sector->RemoveWritableBytes(result.size());
new_sector->AddValidBytes(result.size());
+ sectors_.FromAddress(address).RemoveValidBytes(result.size());
address = new_address;
return Status::OK;
}
-Status KeyValueStore::GarbageCollectFull() {
- DBG("Garbage Collect all sectors");
+Status KeyValueStore::FullMaintenance() {
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+
+ DBG("Do full maintenance");
+
+ if (error_detected_) {
+ TRY(Repair());
+ }
SectorDescriptor* sector = sectors_.last_new();
@@ -678,12 +750,20 @@
}
}
- DBG("Garbage Collect all complete");
+ DBG("Full maintenance complete");
return Status::OK;
}
-Status KeyValueStore::GarbageCollectPartial(
- span<const Address> reserved_addresses) {
+Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
+ if (initialized_ == InitializationState::kNotInitialized) {
+ return Status::FAILED_PRECONDITION;
+ }
+
+ // Do automatic repair, if KVS options allow for it.
+ if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
+ TRY(Repair());
+ }
+
DBG("Garbage Collect a single sector");
for (Address address : reserved_addresses) {
DBG(" Avoid address %u", unsigned(address));
@@ -720,6 +800,7 @@
Status KeyValueStore::GarbageCollectSector(
SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
+ DBG(" Garbage Collect sector %u", sectors_.Index(sector_to_gc));
// Step 1: Move any valid entries in the GC sector to other sectors
if (sector_to_gc.valid_bytes() != 0) {
for (const EntryMetadata& metadata : entry_cache_) {
@@ -736,7 +817,7 @@
}
// Step 2: Reinitialize the sector
- sector_to_gc.set_writable_bytes(0);
+ sector_to_gc.mark_corrupt();
TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
@@ -744,6 +825,171 @@
return Status::OK;
}
+// Add any missing redundant entries/copies for a key.
+Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
+ SectorDescriptor* new_sector;
+
+ Entry entry;
+
+ // For simplicity use just the first copy. Any known bad copies should have
+ // been removed already.
+ // TODO: Add support to read other copies if needed.
+ TRY(Entry::Read(partition_, metadata.first_address(), formats_, &entry));
+ TRY(entry.VerifyChecksumInFlash());
+
+ for (size_t i = metadata.addresses().size();
+ metadata.addresses().size() < redundancy();
+ i++) {
+ TRY(sectors_.FindSpace(&new_sector, entry.size(), metadata.addresses()));
+
+ const Address new_address = sectors_.NextWritableAddress(*new_sector);
+ const StatusWithSize result = entry.Copy(new_address);
+ TRY(MarkSectorCorruptIfNotOk(result.status(), new_sector));
+
+ if (options_.verify_on_write) {
+ TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), new_sector));
+ }
+ // Entry was written successfully; update descriptor's address and the
+ // sector descriptors to reflect the new entry.
+ new_sector->RemoveWritableBytes(result.size());
+ new_sector->AddValidBytes(result.size());
+
+ metadata.AddNewAddress(new_address);
+ }
+ return Status::OK;
+}
+
+Status KeyValueStore::RepairCorruptSectors() {
+ // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
+ // sector failed on the first pass, then do a second pass, since a later
+ // sector might have cleared up space or otherwise unblocked the earlier
+ // failed sector.
+ Status repair_status = Status::OK;
+
+ size_t loop_count = 0;
+ do {
+ loop_count++;
+ // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
+ // Reset back to OK for the next pass.
+ if (repair_status == Status::RESOURCE_EXHAUSTED) {
+ repair_status = Status::OK;
+ }
+
+ DBG(" Pass %u", unsigned(loop_count));
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.corrupt()) {
+ DBG(" Found sector %u with corruption", sectors_.Index(sector));
+ Status sector_status = GarbageCollectSector(sector, {});
+ if (sector_status.ok()) {
+ error_stats_.corrupt_sectors_recovered += 1;
+ } else if (repair_status.ok() ||
+ repair_status == Status::RESOURCE_EXHAUSTED) {
+ repair_status = sector_status;
+ }
+ }
+ }
+ DBG(" Pass %u complete", unsigned(loop_count));
+ } while (!repair_status.ok() && loop_count < 2);
+
+ return repair_status;
+}
+
+Status KeyValueStore::EnsureFreeSectorExists() {
+ Status repair_status = Status::OK;
+ bool empty_sector_found = false;
+
+ DBG(" Find empty sector");
+ for (SectorDescriptor& sector : sectors_) {
+ if (sector.Empty(partition_.sector_size_bytes())) {
+ empty_sector_found = true;
+ DBG(" Empty sector found");
+ break;
+ }
+ }
+ if (empty_sector_found == false) {
+ DBG(" No empty sector found, attempting to GC a free sector");
+ Status sector_status = GarbageCollect(span<const Address, 0>());
+ if (repair_status.ok() && !sector_status.ok()) {
+ DBG(" Unable to free an empty sector");
+ repair_status = sector_status;
+ }
+ }
+
+ return repair_status;
+}
+
+Status KeyValueStore::EnsureEntryRedundancy() {
+ Status repair_status = Status::OK;
+
+ if (redundancy() == 1) {
+ DBG(" Redundancy not in use, nothing to check");
+ return Status::OK;
+ }
+
+ DBG(" Write any needed additional duplicate copies of key to fulfill %u "
+ "redundancy",
+ unsigned(redundancy()));
+ for (const EntryMetadata& metadata : entry_cache_) {
+ if (metadata.addresses().size() >= redundancy()) {
+ continue;
+ }
+
+ DBG(" Key with %u of %u copies found, adding missing copies",
+ unsigned(metadata.addresses().size()),
+ unsigned(redundancy()));
+ // TODO: Add non-const iterator to the entry_cache_ and remove this
+ // const_cast.
+ Status fill_status =
+ AddRedundantEntries(const_cast<EntryMetadata&>(metadata));
+ if (fill_status.ok()) {
+ error_stats_.missing_redundant_entries_recovered += 1;
+ DBG(" Key missing copies added");
+ } else {
+ DBG(" Failed to add key missing copies");
+ if (repair_status.ok()) {
+ repair_status = fill_status;
+ }
+ }
+ }
+
+ return repair_status;
+}
+
+Status KeyValueStore::Repair() {
+ // Collect and return the first error encountered.
+ Status overall_status = Status::OK;
+
+ DBG("KVS repair");
+
+ // Step 1: Garbage collect any sectors marked as corrupt.
+ Status repair_status = RepairCorruptSectors();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ // Step 2: Make sure there is at least 1 empty sector. This needs to be a
+ // seperate check of sectors from step 1, because a found empty sector might
+ // get written to by a later GC that fails and does not result in a free
+ // sector.
+ repair_status = EnsureFreeSectorExists();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ // Step 3: Make sure each stored key has the full number of redundant
+ // entries.
+ repair_status = EnsureEntryRedundancy();
+ if (overall_status.ok()) {
+ overall_status = repair_status;
+ }
+
+ if (overall_status.ok()) {
+ error_detected_ = false;
+ initialized_ = InitializationState::kReady;
+ }
+ return overall_status;
+}
+
KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
string_view key,
span<const byte> value,
@@ -862,7 +1108,7 @@
void KeyValueStore::LogKeyDescriptor() const {
DBG("Key descriptors: count %zu", entry_cache_.total_entries());
- for (auto& metadata : entry_cache_) {
+ for (const EntryMetadata& metadata : entry_cache_) {
DBG(" - Key: %s, hash %#zx, transaction ID %zu, first address %#zx",
metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
static_cast<size_t>(metadata.hash()),