Merge changes Iaf9e851d,Ic4cbf797

* changes:
  Keystore 2.0: Revise GC.
  Keystore 2.0: Add shelf to async tasks.
diff --git a/keystore2/src/async_task.rs b/keystore2/src/async_task.rs
index 6edd760..9732e79 100644
--- a/keystore2/src/async_task.rs
+++ b/keystore2/src/async_task.rs
@@ -19,9 +19,9 @@
 //! processed all tasks before it terminates.
 //! Note that low priority tasks are processed only when the high priority queue is empty.
 
-use std::time::Duration;
+use std::{any::Any, any::TypeId, time::Duration};
 use std::{
-    collections::VecDeque,
+    collections::{HashMap, VecDeque},
     sync::Arc,
     sync::{Condvar, Mutex, MutexGuard},
     thread,
@@ -33,15 +33,74 @@
     Running,
 }
 
+/// The Shelf allows async tasks to store state across invocations.
+/// Note: Store elves at your own peril ;-).
+#[derive(Debug, Default)]
+pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
+
+impl Shelf {
+    /// Get a reference to the shelved data of type T. Returns Some if the data exists.
+    pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
+        self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
+    }
+
+    /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
+    /// get_mut, or get_or_put_with.
+    pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
+        self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
+    }
+
+    /// Remove the entry of the given type and returns the stored data if it existed.
+    pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
+        self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+    }
+
+    /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
+    pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
+        self.0
+            .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
+            .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+    }
+
+    /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
+    /// The type must implement Default.
+    pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
+        self.0
+            .entry(TypeId::of::<T>())
+            .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
+            .downcast_mut::<T>()
+            .unwrap()
+    }
+
+    /// Gets a mutable reference to the entry of the given type or creates it using the init
+    /// function. Init is not executed if the entry already existed.
+    pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
+    where
+        F: FnOnce() -> T,
+    {
+        self.0
+            .entry(TypeId::of::<T>())
+            .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
+            .downcast_mut::<T>()
+            .unwrap()
+    }
+}
+
 struct AsyncTaskState {
     state: State,
     thread: Option<thread::JoinHandle<()>>,
-    hi_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
-    lo_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
+    hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+    lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+    /// The store allows tasks to store state across invocations. It is passed to each invocation
+    /// of each task. Tasks need to cooperate on the ids they use for storing state.
+    shelf: Option<Shelf>,
 }
 
 /// AsyncTask spawns one worker thread on demand to process jobs inserted into
-/// a low and a high priority work queue.
+/// a low and a high priority work queue. The queues are processed FIFO, and low
+/// priority queue is processed if the high priority queue is empty.
+/// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
+/// all scheduled requests are guaranteed to be serialized with respect to one another.
 pub struct AsyncTask {
     state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
 }
@@ -56,6 +115,7 @@
                     thread: None,
                     hi_prio_req: VecDeque::new(),
                     lo_prio_req: VecDeque::new(),
+                    shelf: None,
                 }),
             )),
         }
@@ -68,7 +128,7 @@
     /// preempt them.
     pub fn queue_hi<F>(&self, f: F)
     where
-        F: FnOnce() + Send + 'static,
+        F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
     {
         self.queue(f, true)
     }
@@ -79,14 +139,14 @@
     /// priority jobs.
     pub fn queue_lo<F>(&self, f: F)
     where
-        F: FnOnce() + Send + 'static,
+        F: FnOnce(&mut Shelf) + Send + 'static,
     {
         self.queue(f, false)
     }
 
     fn queue<F>(&self, f: F, hi_prio: bool)
     where
-        F: FnOnce() + Send + 'static,
+        F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
     {
         let (ref condvar, ref state) = *self.state;
         let mut state = state.lock().unwrap();
@@ -112,6 +172,8 @@
 
         state.thread = Some(thread::spawn(move || {
             let (ref condvar, ref state) = *cloned_state;
+            // When the worker starts, it takes the shelf and puts it on the stack.
+            let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
             loop {
                 if let Some(f) = {
                     let (mut state, timeout) = condvar
@@ -129,13 +191,17 @@
                         (Some(f), _, _) => Some(f),
                         (None, false, _) => state.lo_prio_req.pop_front(),
                         (None, true, true) => {
+                            // When the worker exits it puts the shelf back into the shared
+                            // state for the next worker to use. So state is preserved not
+                            // only across invocations but also across worker thread shut down.
+                            state.shelf = Some(shelf);
                             state.state = State::Exiting;
                             break;
                         }
                         (None, true, false) => None,
                     }
                 } {
-                    f()
+                    f(&mut shelf)
                 }
             }
         }));
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index 3789d28..dc6d7a0 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -41,12 +41,15 @@
 //! from the database module these functions take permission check
 //! callbacks.
 
-use crate::db_utils::{self, SqlField};
 use crate::error::{Error as KsError, ErrorCode, ResponseCode};
 use crate::impl_metadata; // This is in db_utils.rs
 use crate::key_parameter::{KeyParameter, Tag};
 use crate::permission::KeyPermSet;
 use crate::utils::get_current_time_in_seconds;
+use crate::{
+    db_utils::{self, SqlField},
+    gc::Gc,
+};
 use anyhow::{anyhow, Context, Result};
 use std::{convert::TryFrom, convert::TryInto, ops::Deref, time::SystemTimeError};
 
@@ -95,17 +98,7 @@
     /// A metadata entry for key entries.
     #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
     pub enum KeyMetaEntry {
-        /// If present, indicates that the sensitive part of key
-        /// is encrypted with another key or a key derived from a password.
-        EncryptedBy(EncryptedBy) with accessor encrypted_by,
-        /// If the blob is password encrypted this field is set to the
-        /// salt used for the key derivation.
-        Salt(Vec<u8>) with accessor salt,
-        /// If the blob is encrypted, this field is set to the initialization vector.
-        Iv(Vec<u8>) with accessor iv,
-        /// If the blob is encrypted, this field holds the AEAD TAG.
-        AeadTag(Vec<u8>) with accessor aead_tag,
-        /// Creation date of a the key entry.
+        /// Date of the creation of the key entry.
         CreationDate(DateTime) with accessor creation_date,
         /// Expiration date for attestation keys.
         AttestationExpirationDate(DateTime) with accessor attestation_expiration_date,
@@ -166,6 +159,76 @@
     }
 }
 
+impl_metadata!(
+    /// A set of metadata for key blobs.
+    #[derive(Debug, Default, Eq, PartialEq)]
+    pub struct BlobMetaData;
+    /// A metadata entry for key blobs.
+    #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
+    pub enum BlobMetaEntry {
+        /// If present, indicates that the blob is encrypted with another key or a key derived
+        /// from a password.
+        EncryptedBy(EncryptedBy) with accessor encrypted_by,
+        /// If the blob is password encrypted this field is set to the
+        /// salt used for the key derivation.
+        Salt(Vec<u8>) with accessor salt,
+        /// If the blob is encrypted, this field is set to the initialization vector.
+        Iv(Vec<u8>) with accessor iv,
+        /// If the blob is encrypted, this field holds the AEAD TAG.
+        AeadTag(Vec<u8>) with accessor aead_tag,
+        /// The uuid of the owning KeyMint instance.
+        KmUuid(Uuid) with accessor km_uuid,
+        //  --- ADD NEW META DATA FIELDS HERE ---
+        // For backwards compatibility add new entries only to
+        // end of this list and above this comment.
+    };
+);
+
+impl BlobMetaData {
+    fn load_from_db(blob_id: i64, tx: &Transaction) -> Result<Self> {
+        let mut stmt = tx
+            .prepare(
+                "SELECT tag, data from persistent.blobmetadata
+                    WHERE blobentryid = ?;",
+            )
+            .context("In BlobMetaData::load_from_db: prepare statement failed.")?;
+
+        let mut metadata: HashMap<i64, BlobMetaEntry> = Default::default();
+
+        let mut rows =
+            stmt.query(params![blob_id]).context("In BlobMetaData::load_from_db: query failed.")?;
+        db_utils::with_rows_extract_all(&mut rows, |row| {
+            let db_tag: i64 = row.get(0).context("Failed to read tag.")?;
+            metadata.insert(
+                db_tag,
+                BlobMetaEntry::new_from_sql(db_tag, &SqlField::new(1, &row))
+                    .context("Failed to read BlobMetaEntry.")?,
+            );
+            Ok(())
+        })
+        .context("In BlobMetaData::load_from_db.")?;
+
+        Ok(Self { data: metadata })
+    }
+
+    fn store_in_db(&self, blob_id: i64, tx: &Transaction) -> Result<()> {
+        let mut stmt = tx
+            .prepare(
+                "INSERT or REPLACE INTO persistent.blobmetadata (blobentryid, tag, data)
+                    VALUES (?, ?, ?);",
+            )
+            .context("In BlobMetaData::store_in_db: Failed to prepare statement.")?;
+
+        let iter = self.data.iter();
+        for (tag, entry) in iter {
+            stmt.insert(params![blob_id, tag, entry,]).with_context(|| {
+                format!("In BlobMetaData::store_in_db: Failed to insert {:?}", entry)
+            })?;
+        }
+        Ok(())
+    }
+}
+
 /// Indicates the type of the keyentry.
 #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
 pub enum KeyType {
@@ -531,7 +594,7 @@
 #[derive(Debug, Default, Eq, PartialEq)]
 pub struct KeyEntry {
     id: i64,
-    km_blob: Option<Vec<u8>>,
+    key_blob_info: Option<(Vec<u8>, BlobMetaData)>,
     cert: Option<Vec<u8>>,
     cert_chain: Option<Vec<u8>>,
     km_uuid: Uuid,
@@ -546,12 +609,12 @@
         self.id
     }
     /// Exposes the optional KeyMint blob.
-    pub fn km_blob(&self) -> &Option<Vec<u8>> {
-        &self.km_blob
+    pub fn key_blob_info(&self) -> &Option<(Vec<u8>, BlobMetaData)> {
+        &self.key_blob_info
     }
-    /// Extracts the Optional KeyMint blob.
-    pub fn take_km_blob(&mut self) -> Option<Vec<u8>> {
-        self.km_blob.take()
+    /// Extracts the Optional KeyMint blob including its metadata.
+    pub fn take_key_blob_info(&mut self) -> Option<(Vec<u8>, BlobMetaData)> {
+        self.key_blob_info.take()
     }
     /// Exposes the optional public certificate.
     pub fn cert(&self) -> &Option<Vec<u8>> {
@@ -616,10 +679,39 @@
     }
 }
 
+/// This trait is private to the database module. It is used to convey whether or not the garbage
+/// collector shall be invoked after a database access. All closures passed to
+/// `KeystoreDB::with_transaction` return a tuple (bool, T) where the bool indicates if the
+/// gc needs to be triggered. This convenience function allows to turn any anyhow::Result<T>
+/// into anyhow::Result<(bool, T)> by simply appending one of `.do_gc(bool)`, `.no_gc()`, or
+/// `.need_gc()`.
+trait DoGc<T> {
+    fn do_gc(self, need_gc: bool) -> Result<(bool, T)>;
+
+    fn no_gc(self) -> Result<(bool, T)>;
+
+    fn need_gc(self) -> Result<(bool, T)>;
+}
+
+impl<T> DoGc<T> for Result<T> {
+    fn do_gc(self, need_gc: bool) -> Result<(bool, T)> {
+        self.map(|r| (need_gc, r))
+    }
+
+    fn no_gc(self) -> Result<(bool, T)> {
+        self.do_gc(false)
+    }
+
+    fn need_gc(self) -> Result<(bool, T)> {
+        self.do_gc(true)
+    }
+}
+
 /// KeystoreDB wraps a connection to an SQLite database and tracks its
 /// ownership. It also implements all of Keystore 2.0's database functionality.
 pub struct KeystoreDB {
     conn: Connection,
+    gc: Option<Gc>,
 }
 
 /// Database representation of the monotonic time retrieved from the system call clock_gettime with
@@ -715,7 +807,7 @@
     /// It also attempts to initialize all of the tables.
     /// KeystoreDB cannot be used by multiple threads.
     /// Each thread should open their own connection using `thread_local!`.
-    pub fn new(db_root: &Path) -> Result<Self> {
+    pub fn new(db_root: &Path, gc: Option<Gc>) -> Result<Self> {
         // Build the path to the sqlite file.
         let mut persistent_path = db_root.to_path_buf();
         persistent_path.push("persistent.sqlite");
@@ -729,9 +821,9 @@
         // On busy fail Immediately. It is unlikely to succeed given a bug in sqlite.
         conn.busy_handler(None).context("In KeystoreDB::new: Failed to set busy handler.")?;
 
-        let mut db = Self { conn };
+        let mut db = Self { conn, gc };
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::init_tables(tx).context("Trying to initialize tables.")
+            Self::init_tables(tx).context("Trying to initialize tables.").no_gc()
         })?;
         Ok(db)
     }
@@ -782,6 +874,24 @@
         .context("Failed to create index blobentry_keyentryid_index.")?;
 
         tx.execute(
+            "CREATE TABLE IF NOT EXISTS persistent.blobmetadata (
+                     id INTEGER PRIMARY KEY,
+                     blobentryid INTEGER,
+                     tag INTEGER,
+                     data ANY,
+                     UNIQUE (blobentryid, tag));",
+            NO_PARAMS,
+        )
+        .context("Failed to initialize \"blobmetadata\" table.")?;
+
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobmetadata_blobentryid_index
+            ON blobmetadata(blobentryid);",
+            NO_PARAMS,
+        )
+        .context("Failed to create index blobmetadata_blobentryid_index.")?;
+
+        tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.keyparameter (
                      keyentryid INTEGER,
                      tag INTEGER,
@@ -894,77 +1004,73 @@
         Ok(conn)
     }
 
-    /// Get one unreferenced key. There is no particular order in which the keys are returned.
-    fn get_unreferenced_key_id(tx: &Transaction) -> Result<Option<i64>> {
-        tx.query_row(
-            "SELECT id FROM persistent.keyentry WHERE state = ?",
-            params![KeyLifeCycle::Unreferenced],
-            |row| row.get(0),
-        )
-        .optional()
-        .context("In get_unreferenced_key_id: Trying to get unreferenced key id.")
-    }
-
-    /// Returns a key id guard and key entry for one unreferenced key entry. Of the optional
-    /// fields of the key entry only the km_blob field will be populated. This is required
-    /// to subject the blob to its KeyMint instance for deletion.
-    pub fn get_unreferenced_key(&mut self) -> Result<Option<(KeyIdGuard, KeyEntry)>> {
-        self.with_transaction(TransactionBehavior::Deferred, |tx| {
-            let key_id = match Self::get_unreferenced_key_id(tx)
-                .context("Trying to get unreferenced key id")?
-            {
-                None => return Ok(None),
-                Some(id) => KEY_ID_LOCK.try_get(id).ok_or_else(KsError::sys).context(concat!(
-                    "A key id lock was held for an unreferenced key. ",
-                    "This should never happen."
-                ))?,
-            };
-            let key_entry = Self::load_key_components(tx, KeyEntryLoadBits::KM, key_id.id())
-                .context("Trying to get key components.")?;
-            Ok(Some((key_id, key_entry)))
-        })
-        .context("In get_unreferenced_key.")
-    }
-
-    /// This function purges all remnants of a key entry from the database.
-    /// Important: This does not check if the key was unreferenced, nor does it
-    /// subject the key to its KeyMint instance for permanent invalidation.
-    /// This function should only be called by the garbage collector.
-    /// To delete a key call `mark_unreferenced`, which transitions the key to the unreferenced
-    /// state, deletes all grants to the key, and notifies the garbage collector.
-    /// The garbage collector will:
-    ///  1. Call get_unreferenced_key.
-    ///  2. Determine the proper way to dispose of sensitive key material, e.g., call
-    ///     `KeyMintDevice::delete()`.
-    ///  3. Call `purge_key_entry`.
-    pub fn purge_key_entry(&mut self, key_id: KeyIdGuard) -> Result<()> {
+    /// This function is intended to be used by the garbage collector.
+    /// It deletes the blob given by `blob_id_to_delete`. It then tries to find a superseded
+    /// key blob that might need special handling by the garbage collector.
+    /// If no further superseded blobs can be found it deletes all other superseded blobs that don't
+    /// need special handling and returns None.
+    pub fn handle_next_superseded_blob(
+        &mut self,
+        blob_id_to_delete: Option<i64>,
+    ) -> Result<Option<(i64, Vec<u8>, BlobMetaData)>> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            tx.execute("DELETE FROM persistent.keyentry WHERE id = ?;", params![key_id.id()])
-                .context("Trying to delete keyentry.")?;
-            tx.execute(
-                "DELETE FROM persistent.blobentry WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete blobentries.")?;
-            tx.execute(
-                "DELETE FROM persistent.keymetadata WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete keymetadata.")?;
-            tx.execute(
-                "DELETE FROM persistent.keyparameter WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete keyparameters.")?;
-            let grants_deleted = tx
-                .execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id.id()])
-                .context("Trying to delete grants.")?;
-            if grants_deleted != 0 {
-                log::error!("Purged key that still had grants. This should not happen.");
+            // Delete the given blob if one was given.
+            if let Some(blob_id_to_delete) = blob_id_to_delete {
+                tx.execute(
+                    "DELETE FROM persistent.blobmetadata WHERE blobentryid = ?;",
+                    params![blob_id_to_delete],
+                )
+                .context("Trying to delete blob metadata.")?;
+                tx.execute(
+                    "DELETE FROM persistent.blobentry WHERE id = ?;",
+                    params![blob_id_to_delete],
+                )
+                .context("Trying to blob.")?;
             }
-            Ok(())
+
+            // Find another superseded keyblob load its metadata and return it.
+            if let Some((blob_id, blob)) = tx
+                .query_row(
+                    "SELECT id, blob FROM persistent.blobentry
+                     WHERE subcomponent_type = ?
+                     AND (
+                         id NOT IN (
+                             SELECT MAX(id) FROM persistent.blobentry
+                             WHERE subcomponent_type = ?
+                             GROUP BY keyentryid, subcomponent_type
+                         )
+                     OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                 );",
+                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+                    |row| Ok((row.get(0)?, row.get(1)?)),
+                )
+                .optional()
+                .context("Trying to query superseded blob.")?
+            {
+                let blob_metadata = BlobMetaData::load_from_db(blob_id, tx)
+                    .context("Trying to load blob metadata.")?;
+                return Ok(Some((blob_id, blob, blob_metadata))).no_gc();
+            }
+
+            // We did not find any superseded key blob, so let's remove other superseded blob in
+            // one transaction.
+            tx.execute(
+                "DELETE FROM persistent.blobentry
+                 WHERE NOT subcomponent_type = ?
+                 AND (
+                     id NOT IN (
+                        SELECT MAX(id) FROM persistent.blobentry
+                        WHERE NOT subcomponent_type = ?
+                        GROUP BY keyentryid, subcomponent_type
+                     ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                 );",
+                params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+            )
+            .context("Trying to purge superseded blobs.")?;
+
+            Ok(None).no_gc()
         })
-        .context("In purge_key_entry.")
+        .context("In handle_next_superseded_blob.")
     }
 
     /// This maintenance function should be called only once before the database is used for the
@@ -982,6 +1088,7 @@
                 params![KeyLifeCycle::Unreferenced, KeyLifeCycle::Existing],
             )
             .context("Failed to execute query.")
+            .need_gc()
         })
         .context("In cleanup_leftovers.")
     }
@@ -999,7 +1106,7 @@
         create_new_key: F,
     ) -> Result<(KeyIdGuard, KeyEntry)>
     where
-        F: Fn() -> Result<(Vec<u8>, KeyMetaData)>,
+        F: Fn() -> Result<(Vec<u8>, BlobMetaData)>,
     {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let id = {
@@ -1055,22 +1162,26 @@
 
                     let (blob, metadata) =
                         create_new_key().context("In get_or_create_key_with.")?;
-                    Self::set_blob_internal(&tx, id, SubComponentType::KEY_BLOB, Some(&blob))
-                        .context("In get_of_create_key_with.")?;
-                    metadata.store_in_db(id, &tx).context("In get_or_create_key_with.")?;
+                    Self::set_blob_internal(
+                        &tx,
+                        id,
+                        SubComponentType::KEY_BLOB,
+                        Some(&blob),
+                        Some(&metadata),
+                    )
+                    .context("In get_of_create_key_with.")?;
                     (
                         id,
                         KeyEntry {
                             id,
-                            km_blob: Some(blob),
-                            metadata,
+                            key_blob_info: Some((blob, metadata)),
                             pure_cert: false,
                             ..Default::default()
                         },
                     )
                 }
             };
-            Ok((KEY_ID_LOCK.get(id), entry))
+            Ok((KEY_ID_LOCK.get(id), entry)).no_gc()
         })
         .context("In get_or_create_key_with.")
     }
@@ -1092,7 +1203,7 @@
     /// or DatabaseLocked is encountered.
     fn with_transaction<T, F>(&mut self, behavior: TransactionBehavior, f: F) -> Result<T>
     where
-        F: Fn(&Transaction) -> Result<T>,
+        F: Fn(&Transaction) -> Result<(bool, T)>,
     {
         loop {
             match self
@@ -1115,6 +1226,14 @@
                 }
             }
         }
+        .map(|(need_gc, result)| {
+            if need_gc {
+                if let Some(ref gc) = self.gc {
+                    gc.notify_gc();
+                }
+            }
+            result
+        })
     }
 
     fn is_locked_error(e: &anyhow::Error) -> bool {
@@ -1142,7 +1261,7 @@
         km_uuid: &Uuid,
     ) -> Result<KeyIdGuard> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::create_key_entry_internal(tx, domain, namespace, km_uuid)
+            Self::create_key_entry_internal(tx, domain, namespace, km_uuid).no_gc()
         })
         .context("In create_key_entry.")
     }
@@ -1204,12 +1323,18 @@
                 })
                 .context("In create_key_entry")?,
             );
-            Self::set_blob_internal(&tx, key_id.0, SubComponentType::KEY_BLOB, Some(private_key))?;
+            Self::set_blob_internal(
+                &tx,
+                key_id.0,
+                SubComponentType::KEY_BLOB,
+                Some(private_key),
+                None,
+            )?;
             let mut metadata = KeyMetaData::new();
             metadata.add(KeyMetaEntry::AttestationMacedPublicKey(maced_public_key.to_vec()));
             metadata.add(KeyMetaEntry::AttestationRawPubKey(raw_public_key.to_vec()));
             metadata.store_in_db(key_id.0, &tx)?;
-            Ok(())
+            Ok(()).no_gc()
         })
         .context("In create_attestation_key_entry")
     }
@@ -1226,9 +1351,10 @@
         key_id: &KeyIdGuard,
         sc_type: SubComponentType,
         blob: Option<&[u8]>,
+        blob_metadata: Option<&BlobMetaData>,
     ) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::set_blob_internal(&tx, key_id.0, sc_type, blob)
+            Self::set_blob_internal(&tx, key_id.0, sc_type, blob, blob_metadata).need_gc()
         })
         .context("In set_blob.")
     }
@@ -1238,6 +1364,7 @@
         key_id: i64,
         sc_type: SubComponentType,
         blob: Option<&[u8]>,
+        blob_metadata: Option<&BlobMetaData>,
     ) -> Result<()> {
         match (blob, sc_type) {
             (Some(blob), _) => {
@@ -1247,6 +1374,16 @@
                     params![sc_type, key_id, blob],
                 )
                 .context("In set_blob_internal: Failed to insert blob.")?;
+                if let Some(blob_metadata) = blob_metadata {
+                    let blob_id = tx
+                        .query_row("SELECT MAX(id) FROM persistent.blobentry;", NO_PARAMS, |row| {
+                            row.get(0)
+                        })
+                        .context("In set_blob_internal: Failed to get new blob id.")?;
+                    blob_metadata
+                        .store_in_db(blob_id, tx)
+                        .context("In set_blob_internal: Trying to store blob metadata.")?;
+                }
             }
             (None, SubComponentType::CERT) | (None, SubComponentType::CERT_CHAIN) => {
                 tx.execute(
@@ -1266,13 +1403,10 @@
 
     /// Inserts a collection of key parameters into the `persistent.keyparameter` table
     /// and associates them with the given `key_id`.
-    pub fn insert_keyparameter(
-        &mut self,
-        key_id: &KeyIdGuard,
-        params: &[KeyParameter],
-    ) -> Result<()> {
+    #[cfg(test)]
+    fn insert_keyparameter(&mut self, key_id: &KeyIdGuard, params: &[KeyParameter]) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::insert_keyparameter_internal(tx, key_id, params)
+            Self::insert_keyparameter_internal(tx, key_id, params).no_gc()
         })
         .context("In insert_keyparameter.")
     }
@@ -1304,13 +1438,10 @@
     }
 
     /// Insert a set of key entry specific metadata into the database.
-    pub fn insert_key_metadata(
-        &mut self,
-        key_id: &KeyIdGuard,
-        metadata: &KeyMetaData,
-    ) -> Result<()> {
+    #[cfg(test)]
+    fn insert_key_metadata(&mut self, key_id: &KeyIdGuard, metadata: &KeyMetaData) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            metadata.store_in_db(key_id.0, &tx)
+            metadata.store_in_db(key_id.0, &tx).no_gc()
         })
         .context("In insert_key_metadata.")
     }
@@ -1370,9 +1501,15 @@
                 expiration_date,
             )));
             metadata.store_in_db(key_id, &tx).context("Failed to insert key metadata.")?;
-            Self::set_blob_internal(&tx, key_id, SubComponentType::CERT_CHAIN, Some(cert_chain))
-                .context("Failed to insert cert chain")?;
-            Ok(())
+            Self::set_blob_internal(
+                &tx,
+                key_id,
+                SubComponentType::CERT_CHAIN,
+                Some(cert_chain),
+                None,
+            )
+            .context("Failed to insert cert chain")?;
+            Ok(()).no_gc()
         })
         .context("In store_signed_attestation_certificate_chain: ")
     }
@@ -1434,7 +1571,7 @@
                     result
                 ));
             }
-            Ok(())
+            Ok(()).no_gc()
         })
         .context("In assign_attestation_key: ")
     }
@@ -1476,7 +1613,7 @@
                 )?
                 .collect::<rusqlite::Result<Vec<Vec<u8>>>>()
                 .context("Failed to execute statement")?;
-            Ok(rows)
+            Ok(rows).no_gc()
         })
         .context("In fetch_unsigned_attestation_keys")
     }
@@ -1511,7 +1648,7 @@
                     num_deleted += 1;
                 }
             }
-            Ok(num_deleted)
+            Ok(num_deleted).do_gc(num_deleted != 0)
         })
         .context("In delete_expired_attestation_keys: ")
     }
@@ -1576,7 +1713,7 @@
                     _ => {}
                 }
             }
-            Ok(AttestationPoolStatus { expiring, unassigned, attested, total })
+            Ok(AttestationPoolStatus { expiring, unassigned, attested, total }).no_gc()
         })
         .context("In get_attestation_pool_status: ")
     }
@@ -1597,8 +1734,9 @@
                     .context(format!("Domain {:?} must be either App or SELinux.", domain));
             }
         }
-        let mut stmt = self.conn.prepare(
-            "SELECT subcomponent_type, blob
+        self.with_transaction(TransactionBehavior::Deferred, |tx| {
+            let mut stmt = tx.prepare(
+                "SELECT subcomponent_type, blob
              FROM persistent.blobentry
              WHERE keyentryid IN
                 (SELECT id
@@ -1608,48 +1746,50 @@
                        AND namespace = ?
                        AND state = ?
                        AND km_uuid = ?);",
-        )?;
-        let rows = stmt
-            .query_map(
-                params![
-                    KeyType::Attestation,
-                    domain.0 as u32,
-                    namespace,
-                    KeyLifeCycle::Live,
-                    km_uuid
-                ],
-                |row| Ok((row.get(0)?, row.get(1)?)),
-            )?
-            .collect::<rusqlite::Result<Vec<(SubComponentType, Vec<u8>)>>>()
-            .context("In retrieve_attestation_key_and_cert_chain: query failed.")?;
-        if rows.is_empty() {
-            return Ok(None);
-        } else if rows.len() != 2 {
-            return Err(KsError::sys()).context(format!(
-                concat!(
+            )?;
+            let rows = stmt
+                .query_map(
+                    params![
+                        KeyType::Attestation,
+                        domain.0 as u32,
+                        namespace,
+                        KeyLifeCycle::Live,
+                        km_uuid
+                    ],
+                    |row| Ok((row.get(0)?, row.get(1)?)),
+                )?
+                .collect::<rusqlite::Result<Vec<(SubComponentType, Vec<u8>)>>>()
+                .context("In retrieve_attestation_key_and_cert_chain: query failed.")?;
+            if rows.is_empty() {
+                return Ok(None).no_gc();
+            } else if rows.len() != 2 {
+                return Err(KsError::sys()).context(format!(
+                    concat!(
                 "In retrieve_attestation_key_and_cert_chain: Expected to get a single attestation",
                 "key chain but instead got {}."),
-                rows.len()
-            ));
-        }
-        let mut km_blob: Vec<u8> = Vec::new();
-        let mut cert_chain_blob: Vec<u8> = Vec::new();
-        for row in rows {
-            let sub_type: SubComponentType = row.0;
-            match sub_type {
-                SubComponentType::KEY_BLOB => {
-                    km_blob = row.1;
-                }
-                SubComponentType::CERT_CHAIN => {
-                    cert_chain_blob = row.1;
-                }
-                _ => Err(KsError::sys()).context("Unknown or incorrect subcomponent type.")?,
+                    rows.len()
+                ));
             }
-        }
-        Ok(Some(CertificateChain {
-            private_key: ZVec::try_from(km_blob)?,
-            cert_chain: ZVec::try_from(cert_chain_blob)?,
-        }))
+            let mut km_blob: Vec<u8> = Vec::new();
+            let mut cert_chain_blob: Vec<u8> = Vec::new();
+            for row in rows {
+                let sub_type: SubComponentType = row.0;
+                match sub_type {
+                    SubComponentType::KEY_BLOB => {
+                        km_blob = row.1;
+                    }
+                    SubComponentType::CERT_CHAIN => {
+                        cert_chain_blob = row.1;
+                    }
+                    _ => Err(KsError::sys()).context("Unknown or incorrect subcomponent type.")?,
+                }
+            }
+            Ok(Some(CertificateChain {
+                private_key: ZVec::try_from(km_blob)?,
+                cert_chain: ZVec::try_from(cert_chain_blob)?,
+            }))
+            .no_gc()
+        })
     }
 
     /// Updates the alias column of the given key id `newid` with the given alias,
@@ -1714,11 +1854,11 @@
         &mut self,
         key: &KeyDescriptor,
         params: &[KeyParameter],
-        blob: &[u8],
+        blob_info: &(&[u8], &BlobMetaData),
         cert_info: &CertificateInfo,
         metadata: &KeyMetaData,
         km_uuid: &Uuid,
-    ) -> Result<(bool, KeyIdGuard)> {
+    ) -> Result<KeyIdGuard> {
         let (alias, domain, namespace) = match key {
             KeyDescriptor { alias: Some(alias), domain: Domain::APP, nspace, blob: None }
             | KeyDescriptor { alias: Some(alias), domain: Domain::SELINUX, nspace, blob: None } => {
@@ -1732,10 +1872,17 @@
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let key_id = Self::create_key_entry_internal(tx, &domain, namespace, km_uuid)
                 .context("Trying to create new key entry.")?;
-            Self::set_blob_internal(tx, key_id.id(), SubComponentType::KEY_BLOB, Some(blob))
-                .context("Trying to insert the key blob.")?;
+            let (blob, blob_metadata) = *blob_info;
+            Self::set_blob_internal(
+                tx,
+                key_id.id(),
+                SubComponentType::KEY_BLOB,
+                Some(blob),
+                Some(&blob_metadata),
+            )
+            .context("Trying to insert the key blob.")?;
             if let Some(cert) = &cert_info.cert {
-                Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT, Some(&cert))
+                Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT, Some(&cert), None)
                     .context("Trying to insert the certificate.")?;
             }
             if let Some(cert_chain) = &cert_info.cert_chain {
@@ -1744,6 +1891,7 @@
                     key_id.id(),
                     SubComponentType::CERT_CHAIN,
                     Some(&cert_chain),
+                    None,
                 )
                 .context("Trying to insert the certificate chain.")?;
             }
@@ -1752,7 +1900,7 @@
             metadata.store_in_db(key_id.id(), tx).context("Trying to insert key metadata.")?;
             let need_gc = Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
                 .context("Trying to rebind alias.")?;
-            Ok((need_gc, key_id))
+            Ok(key_id).do_gc(need_gc)
         })
         .context("In store_new_key.")
     }
@@ -1781,8 +1929,14 @@
             let key_id = Self::create_key_entry_internal(tx, &domain, namespace, km_uuid)
                 .context("Trying to create new key entry.")?;
 
-            Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT_CHAIN, Some(cert))
-                .context("Trying to insert certificate.")?;
+            Self::set_blob_internal(
+                tx,
+                key_id.id(),
+                SubComponentType::CERT_CHAIN,
+                Some(cert),
+                None,
+            )
+            .context("Trying to insert certificate.")?;
 
             let mut metadata = KeyMetaData::new();
             metadata.add(KeyMetaEntry::CreationDate(
@@ -1791,9 +1945,9 @@
 
             metadata.store_in_db(key_id.id(), tx).context("Trying to insert key metadata.")?;
 
-            Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
+            let need_gc = Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
                 .context("Trying to rebind alias.")?;
-            Ok(key_id)
+            Ok(key_id).do_gc(need_gc)
         })
         .context("In store_new_certificate.")
     }
@@ -1954,7 +2108,7 @@
         key_id: i64,
         load_bits: KeyEntryLoadBits,
         tx: &Transaction,
-    ) -> Result<(bool, Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<u8>>)> {
+    ) -> Result<(bool, Option<(Vec<u8>, BlobMetaData)>, Option<Vec<u8>>, Option<Vec<u8>>)> {
         let mut stmt = tx
             .prepare(
                 "SELECT MAX(id), subcomponent_type, blob FROM persistent.blobentry
@@ -1965,7 +2119,7 @@
         let mut rows =
             stmt.query(params![key_id]).context("In load_blob_components: query failed.")?;
 
-        let mut km_blob: Option<Vec<u8>> = None;
+        let mut key_blob: Option<(i64, Vec<u8>)> = None;
         let mut cert_blob: Option<Vec<u8>> = None;
         let mut cert_chain_blob: Option<Vec<u8>> = None;
         let mut has_km_blob: bool = false;
@@ -1975,7 +2129,10 @@
             has_km_blob = has_km_blob || sub_type == SubComponentType::KEY_BLOB;
             match (sub_type, load_bits.load_public(), load_bits.load_km()) {
                 (SubComponentType::KEY_BLOB, _, true) => {
-                    km_blob = Some(row.get(2).context("Failed to extract KM blob.")?);
+                    key_blob = Some((
+                        row.get(0).context("Failed to extract key blob id.")?,
+                        row.get(2).context("Failed to extract key blob.")?,
+                    ));
                 }
                 (SubComponentType::CERT, true, _) => {
                     cert_blob =
@@ -1994,7 +2151,15 @@
         })
         .context("In load_blob_components.")?;
 
-        Ok((has_km_blob, km_blob, cert_blob, cert_chain_blob))
+        let blob_info = key_blob.map_or::<Result<_>, _>(Ok(None), |(blob_id, blob)| {
+            Ok(Some((
+                blob,
+                BlobMetaData::load_from_db(blob_id, tx)
+                    .context("In load_blob_components: Trying to load blob_metadata.")?,
+            )))
+        })?;
+
+        Ok((has_km_blob, blob_info, cert_blob, cert_chain_blob))
     }
 
     fn load_key_parameters(key_id: i64, tx: &Transaction) -> Result<Vec<KeyParameter>> {
@@ -2027,7 +2192,7 @@
     /// usage has been exhausted, if not, decreases the usage count. If the usage count reaches
     /// zero, the key also gets marked unreferenced and scheduled for deletion.
     /// Returns Ok(true) if the key was marked unreferenced as a hint to the garbage collector.
-    pub fn check_and_update_key_usage_count(&mut self, key_id: i64) -> Result<bool> {
+    pub fn check_and_update_key_usage_count(&mut self, key_id: i64) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let limit: Option<i32> = tx
                 .query_row(
@@ -2052,9 +2217,10 @@
 
             match limit {
                 1 => Self::mark_unreferenced(tx, key_id)
+                    .map(|need_gc| (need_gc, ()))
                     .context("Trying to mark limited use key for deletion."),
                 0 => Err(KsError::Km(ErrorCode::INVALID_KEY_BLOB)).context("Key is exhausted."),
-                _ => Ok(false),
+                _ => Ok(()).no_gc(),
             }
         })
         .context("In check_and_update_key_usage_count.")
@@ -2180,13 +2346,14 @@
 
     fn mark_unreferenced(tx: &Transaction, key_id: i64) -> Result<bool> {
         let updated = tx
-            .execute(
-                "UPDATE persistent.keyentry SET state = ? WHERE id = ?;",
-                params![KeyLifeCycle::Unreferenced, key_id],
-            )
-            .context("In mark_unreferenced: Failed to update state of key entry.")?;
-        tx.execute("DELETE from persistent.grant WHERE keyentryid = ?;", params![key_id])
-            .context("In mark_unreferenced: Failed to drop grants.")?;
+            .execute("DELETE FROM persistent.keyentry WHERE id = ?;", params![key_id])
+            .context("Trying to delete keyentry.")?;
+        tx.execute("DELETE FROM persistent.keymetadata WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete keymetadata.")?;
+        tx.execute("DELETE FROM persistent.keyparameter WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete keyparameters.")?;
+        tx.execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete grants.")?;
         Ok(updated != 0)
     }
 
@@ -2198,7 +2365,7 @@
         key_type: KeyType,
         caller_uid: u32,
         check_permission: impl Fn(&KeyDescriptor, Option<KeyPermSet>) -> Result<()>,
-    ) -> Result<bool> {
+    ) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let (key_id, access_key_descriptor, access_vector) =
                 Self::load_access_tuple(tx, key, key_type, caller_uid)
@@ -2209,7 +2376,9 @@
             check_permission(&access_key_descriptor, access_vector)
                 .context("While checking permission.")?;
 
-            Self::mark_unreferenced(tx, key_id).context("Trying to mark the key unreferenced.")
+            Self::mark_unreferenced(tx, key_id)
+                .map(|need_gc| (need_gc, ()))
+                .context("Trying to mark the key unreferenced.")
         })
         .context("In unbind_key.")
     }
@@ -2230,7 +2399,7 @@
     ) -> Result<KeyEntry> {
         let metadata = KeyMetaData::load_from_db(key_id, &tx).context("In load_key_components.")?;
 
-        let (has_km_blob, km_blob, cert_blob, cert_chain_blob) =
+        let (has_km_blob, key_blob_info, cert_blob, cert_chain_blob) =
             Self::load_blob_components(key_id, load_bits, &tx)
                 .context("In load_key_components.")?;
 
@@ -2242,7 +2411,7 @@
 
         Ok(KeyEntry {
             id: key_id,
-            km_blob,
+            key_blob_info,
             cert: cert_blob,
             cert_chain: cert_chain_blob,
             km_uuid,
@@ -2279,7 +2448,7 @@
                 Ok(())
             })
             .context("In list: Failed to extract rows.")?;
-            Ok(descriptors)
+            Ok(descriptors).no_gc()
         })
     }
 
@@ -2349,6 +2518,7 @@
             };
 
             Ok(KeyDescriptor { domain: Domain::GRANT, nspace: grant_id, alias: None, blob: None })
+                .no_gc()
         })
     }
 
@@ -2380,7 +2550,7 @@
             )
             .context("Failed to delete grant.")?;
 
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2424,7 +2594,7 @@
                 ],
             )
             .context("In insert_auth_token: failed to insert auth token into the database")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2460,10 +2630,11 @@
                         entry,
                         Self::get_last_off_body(tx)
                             .context("In find_auth_token_entry: Trying to get last off body")?,
-                    )));
+                    )))
+                    .no_gc();
                 }
             }
-            Ok(None)
+            Ok(None).no_gc()
         })
         .context("In find_auth_token_entry.")
     }
@@ -2476,7 +2647,7 @@
                 params!["last_off_body", last_off_body],
             )
             .context("In insert_last_off_body: failed to insert.")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2488,7 +2659,7 @@
                 params![last_off_body, "last_off_body"],
             )
             .context("In update_last_off_body: failed to update.")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2534,13 +2705,23 @@
     fn new_test_db() -> Result<KeystoreDB> {
         let conn = KeystoreDB::make_connection("file::memory:", "file::memory:")?;
 
-        let mut db = KeystoreDB { conn };
+        let mut db = KeystoreDB { conn, gc: None };
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            KeystoreDB::init_tables(tx).context("Failed to initialize tables.")
+            KeystoreDB::init_tables(tx).context("Failed to initialize tables.").no_gc()
         })?;
         Ok(db)
     }
 
+    fn new_test_db_with_gc<F>(path: &Path, cb: F) -> Result<KeystoreDB>
+    where
+        F: Fn(&Uuid, &[u8]) -> Result<()> + Send + 'static,
+    {
+        let gc_db = KeystoreDB::new(path, None).expect("Failed to open test gc db_connection.");
+        let gc = Gc::new_init_with(Default::default(), move || (Box::new(cb), gc_db));
+
+        KeystoreDB::new(path, Some(gc))
+    }
+
     fn rebind_alias(
         db: &mut KeystoreDB,
         newid: &KeyIdGuard,
@@ -2549,7 +2730,7 @@
         namespace: i64,
     ) -> Result<bool> {
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            KeystoreDB::rebind_alias(tx, newid, alias, &domain, &namespace)
+            KeystoreDB::rebind_alias(tx, newid, alias, &domain, &namespace).no_gc()
         })
         .context("In rebind_alias.")
     }
@@ -2601,12 +2782,13 @@
             .prepare("SELECT name from persistent.sqlite_master WHERE type='table' ORDER BY name;")?
             .query_map(params![], |row| row.get(0))?
             .collect::<rusqlite::Result<Vec<String>>>()?;
-        assert_eq!(tables.len(), 5);
+        assert_eq!(tables.len(), 6);
         assert_eq!(tables[0], "blobentry");
-        assert_eq!(tables[1], "grant");
-        assert_eq!(tables[2], "keyentry");
-        assert_eq!(tables[3], "keymetadata");
-        assert_eq!(tables[4], "keyparameter");
+        assert_eq!(tables[1], "blobmetadata");
+        assert_eq!(tables[2], "grant");
+        assert_eq!(tables[3], "keyentry");
+        assert_eq!(tables[4], "keymetadata");
+        assert_eq!(tables[5], "keyparameter");
         let tables = db
             .conn
             .prepare("SELECT name from perboot.sqlite_master WHERE type='table' ORDER BY name;")?
@@ -2696,13 +2878,13 @@
     #[test]
     fn test_persistence_for_files() -> Result<()> {
         let temp_dir = TempDir::new("persistent_db_test")?;
-        let mut db = KeystoreDB::new(temp_dir.path())?;
+        let mut db = KeystoreDB::new(temp_dir.path(), None)?;
 
         db.create_key_entry(&Domain::APP, &100, &KEYSTORE_UUID)?;
         let entries = get_keyentry(&db)?;
         assert_eq!(entries.len(), 1);
 
-        let db = KeystoreDB::new(temp_dir.path())?;
+        let db = KeystoreDB::new(temp_dir.path(), None)?;
 
         let entries_new = get_keyentry(&db)?;
         assert_eq!(entries, entries_new);
@@ -2833,7 +3015,9 @@
 
     #[test]
     fn test_remove_expired_certs() -> Result<()> {
-        let mut db = new_test_db()?;
+        let temp_dir =
+            TempDir::new("test_remove_expired_certs_").expect("Failed to create temp dir.");
+        let mut db = new_test_db_with_gc(temp_dir.path(), |_, _| Ok(()))?;
         let expiration_date: i64 =
             SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as i64 + 10000;
         let namespace: i64 = 30;
@@ -2847,11 +3031,20 @@
         )?;
         load_attestation_key_pool(&mut db, 45, namespace_del1, 0x02)?;
         load_attestation_key_pool(&mut db, 60, namespace_del2, 0x03)?;
+
+        let blob_entry_row_count: u32 = db
+            .conn
+            .query_row("SELECT COUNT(id) FROM persistent.blobentry;", NO_PARAMS, |row| row.get(0))
+            .expect("Failed to get blob entry row count.");
+        // We expect 6 rows here because there are two blobs per attestation key, i.e.,
+        // One key and one certificate.
+        assert_eq!(blob_entry_row_count, 6);
+
         assert_eq!(db.delete_expired_attestation_keys()?, 2);
 
         let mut cert_chain =
             db.retrieve_attestation_key_and_cert_chain(Domain::APP, namespace, &KEYSTORE_UUID)?;
-        assert_eq!(true, cert_chain.is_some());
+        assert!(cert_chain.is_some());
         let value = cert_chain.unwrap();
         assert_eq!(entry_values[1], value.cert_chain.to_vec());
         assert_eq!(entry_values[2], value.private_key.to_vec());
@@ -2861,26 +3054,25 @@
             namespace_del1,
             &KEYSTORE_UUID,
         )?;
-        assert_eq!(false, cert_chain.is_some());
+        assert!(!cert_chain.is_some());
         cert_chain = db.retrieve_attestation_key_and_cert_chain(
             Domain::APP,
             namespace_del2,
             &KEYSTORE_UUID,
         )?;
-        assert_eq!(false, cert_chain.is_some());
+        assert!(!cert_chain.is_some());
 
-        let mut option_entry = db.get_unreferenced_key()?;
-        assert_eq!(true, option_entry.is_some());
-        let (key_guard, _) = option_entry.unwrap();
-        db.purge_key_entry(key_guard)?;
+        // Give the garbage collector half a second to catch up.
+        std::thread::sleep(Duration::from_millis(500));
 
-        option_entry = db.get_unreferenced_key()?;
-        assert_eq!(true, option_entry.is_some());
-        let (key_guard, _) = option_entry.unwrap();
-        db.purge_key_entry(key_guard)?;
+        let blob_entry_row_count: u32 = db
+            .conn
+            .query_row("SELECT COUNT(id) FROM persistent.blobentry;", NO_PARAMS, |row| row.get(0))
+            .expect("Failed to get blob entry row count.");
+        // There shound be 2 blob entries left, because we deleted two of the attestation
+        // key entries with two blobs each.
+        assert_eq!(blob_entry_row_count, 2);
 
-        option_entry = db.get_unreferenced_key()?;
-        assert_eq!(false, option_entry.is_some());
         Ok(())
     }
 
@@ -3120,26 +3312,43 @@
     fn test_set_blob() -> Result<()> {
         let key_id = KEY_ID_LOCK.get(3000);
         let mut db = new_test_db()?;
-        db.set_blob(&key_id, SubComponentType::KEY_BLOB, Some(TEST_KEY_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB))?;
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+        db.set_blob(
+            &key_id,
+            SubComponentType::KEY_BLOB,
+            Some(TEST_KEY_BLOB),
+            Some(&blob_metadata),
+        )?;
+        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB), None)?;
+        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB), None)?;
         drop(key_id);
 
         let mut stmt = db.conn.prepare(
-            "SELECT subcomponent_type, keyentryid, blob FROM persistent.blobentry
+            "SELECT subcomponent_type, keyentryid, blob, id FROM persistent.blobentry
                 ORDER BY subcomponent_type ASC;",
         )?;
         let mut rows = stmt
-            .query_map::<(SubComponentType, i64, Vec<u8>), _, _>(NO_PARAMS, |row| {
-                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
+            .query_map::<((SubComponentType, i64, Vec<u8>), i64), _, _>(NO_PARAMS, |row| {
+                Ok(((row.get(0)?, row.get(1)?, row.get(2)?), row.get(3)?))
             })?;
-        let r = rows.next().unwrap().unwrap();
+        let (r, id) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::KEY_BLOB, 3000, TEST_KEY_BLOB.to_vec()));
-        let r = rows.next().unwrap().unwrap();
+        let (r, _) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::CERT, 3000, TEST_CERT_BLOB.to_vec()));
-        let r = rows.next().unwrap().unwrap();
+        let (r, _) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::CERT_CHAIN, 3000, TEST_CERT_CHAIN_BLOB.to_vec()));
 
+        drop(rows);
+        drop(stmt);
+
+        assert_eq!(
+            db.with_transaction(TransactionBehavior::Immediate, |tx| {
+                BlobMetaData::load_from_db(id, tx).no_gc()
+            })
+            .expect("Should find blob metadata."),
+            blob_metadata
+        );
         Ok(())
     }
 
@@ -3564,7 +3773,7 @@
         let handle = {
             let temp_dir = Arc::new(TempDir::new("id_lock_test")?);
             let temp_dir_clone = temp_dir.clone();
-            let mut db = KeystoreDB::new(temp_dir.path())?;
+            let mut db = KeystoreDB::new(temp_dir.path(), None)?;
             let key_id = make_test_key_entry(&mut db, Domain::APP, 33, KEY_LOCK_TEST_ALIAS, None)
                 .context("test_insert_and_load_full_keyentry_domain_app")?
                 .0;
@@ -3595,7 +3804,7 @@
             // the primary thread.
             let handle = thread::spawn(move || {
                 let temp_dir = temp_dir_clone;
-                let mut db = KeystoreDB::new(temp_dir.path()).unwrap();
+                let mut db = KeystoreDB::new(temp_dir.path(), None).unwrap();
                 assert!(db
                     .load_key_entry(
                         &KeyDescriptor {
@@ -3638,8 +3847,8 @@
         let temp_dir =
             TempDir::new("test_database_busy_error_code_").expect("Failed to create temp dir.");
 
-        let mut db1 = KeystoreDB::new(temp_dir.path()).expect("Failed to open database1.");
-        let mut db2 = KeystoreDB::new(temp_dir.path()).expect("Failed to open database2.");
+        let mut db1 = KeystoreDB::new(temp_dir.path(), None).expect("Failed to open database1.");
+        let mut db2 = KeystoreDB::new(temp_dir.path(), None).expect("Failed to open database2.");
 
         let _tx1 = db1
             .conn
@@ -3799,7 +4008,7 @@
     #[test]
     fn list() -> Result<()> {
         let temp_dir = TempDir::new("list_test")?;
-        let mut db = KeystoreDB::new(temp_dir.path())?;
+        let mut db = KeystoreDB::new(temp_dir.path(), None)?;
         static LIST_O_ENTRIES: &[(Domain, i64, &str)] = &[
             (Domain::APP, 1, "test1"),
             (Domain::APP, 1, "test2"),
@@ -4198,18 +4407,27 @@
         max_usage_count: Option<i32>,
     ) -> Result<KeyIdGuard> {
         let key_id = db.create_key_entry(&domain, &namespace, &KEYSTORE_UUID)?;
-        db.set_blob(&key_id, SubComponentType::KEY_BLOB, Some(TEST_KEY_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB))?;
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::EncryptedBy(EncryptedBy::Password));
+        blob_metadata.add(BlobMetaEntry::Salt(vec![1, 2, 3]));
+        blob_metadata.add(BlobMetaEntry::Iv(vec![2, 3, 1]));
+        blob_metadata.add(BlobMetaEntry::AeadTag(vec![3, 1, 2]));
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+
+        db.set_blob(
+            &key_id,
+            SubComponentType::KEY_BLOB,
+            Some(TEST_KEY_BLOB),
+            Some(&blob_metadata),
+        )?;
+        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB), None)?;
+        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB), None)?;
 
         let params = make_test_params(max_usage_count);
         db.insert_keyparameter(&key_id, &params)?;
 
         let mut metadata = KeyMetaData::new();
-        metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
-        metadata.add(KeyMetaEntry::Salt(vec![1, 2, 3]));
-        metadata.add(KeyMetaEntry::Iv(vec![2, 3, 1]));
-        metadata.add(KeyMetaEntry::AeadTag(vec![3, 1, 2]));
+        metadata.add(KeyMetaEntry::CreationDate(DateTime::from_millis_epoch(123456789)));
         db.insert_key_metadata(&key_id, &metadata)?;
         rebind_alias(db, &key_id, alias, domain, namespace)?;
         Ok(key_id)
@@ -4218,15 +4436,19 @@
     fn make_test_key_entry_test_vector(key_id: i64, max_usage_count: Option<i32>) -> KeyEntry {
         let params = make_test_params(max_usage_count);
 
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::EncryptedBy(EncryptedBy::Password));
+        blob_metadata.add(BlobMetaEntry::Salt(vec![1, 2, 3]));
+        blob_metadata.add(BlobMetaEntry::Iv(vec![2, 3, 1]));
+        blob_metadata.add(BlobMetaEntry::AeadTag(vec![3, 1, 2]));
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+
         let mut metadata = KeyMetaData::new();
-        metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
-        metadata.add(KeyMetaEntry::Salt(vec![1, 2, 3]));
-        metadata.add(KeyMetaEntry::Iv(vec![2, 3, 1]));
-        metadata.add(KeyMetaEntry::AeadTag(vec![3, 1, 2]));
+        metadata.add(KeyMetaEntry::CreationDate(DateTime::from_millis_epoch(123456789)));
 
         KeyEntry {
             id: key_id,
-            km_blob: Some(TEST_KEY_BLOB.to_vec()),
+            key_blob_info: Some((TEST_KEY_BLOB.to_vec(), blob_metadata)),
             cert: Some(TEST_CERT_BLOB.to_vec()),
             cert_chain: Some(TEST_CERT_CHAIN_BLOB.to_vec()),
             km_uuid: KEYSTORE_UUID,
diff --git a/keystore2/src/enforcements.rs b/keystore2/src/enforcements.rs
index 13068c5..9c3bc89 100644
--- a/keystore2/src/enforcements.rs
+++ b/keystore2/src/enforcements.rs
@@ -14,13 +14,10 @@
 
 //! This is the Keystore 2.0 Enforcements module.
 // TODO: more description to follow.
+use crate::database::{AuthTokenEntry, MonotonicRawTime};
 use crate::error::{map_binder_status, Error, ErrorCode};
 use crate::globals::{get_timestamp_service, ASYNC_TASK, DB, ENFORCEMENTS};
 use crate::key_parameter::{KeyParameter, KeyParameterValue};
-use crate::{
-    database::{AuthTokenEntry, MonotonicRawTime},
-    gc::Gc,
-};
 use android_hardware_security_keymint::aidl::android::hardware::security::keymint::{
     Algorithm::Algorithm, ErrorCode::ErrorCode as Ec, HardwareAuthToken::HardwareAuthToken,
     HardwareAuthenticatorType::HardwareAuthenticatorType,
@@ -245,7 +242,7 @@
                 let token_receiver = TokenReceiver(Arc::downgrade(&auth_request));
                 ENFORCEMENTS.register_op_auth_receiver(challenge, token_receiver);
 
-                ASYNC_TASK.queue_hi(move || timestamp_token_request(challenge, sender));
+                ASYNC_TASK.queue_hi(move |_| timestamp_token_request(challenge, sender));
                 self.state = DeferredAuthState::Waiting(auth_request);
                 Some(OperationChallenge { challenge })
             }
@@ -253,7 +250,7 @@
                 let hat = (*hat).clone();
                 let (sender, receiver) = channel::<Result<TimeStampToken, Error>>();
                 let auth_request = AuthRequest::timestamp(hat, receiver);
-                ASYNC_TASK.queue_hi(move || timestamp_token_request(challenge, sender));
+                ASYNC_TASK.queue_hi(move |_| timestamp_token_request(challenge, sender));
                 self.state = DeferredAuthState::Waiting(auth_request);
                 None
             }
@@ -305,16 +302,12 @@
         if let Some(key_id) = self.key_usage_limited {
             // On the last successful use, the key gets deleted. In this case we
             // have to notify the garbage collector.
-            let need_gc = DB
-                .with(|db| {
-                    db.borrow_mut()
-                        .check_and_update_key_usage_count(key_id)
-                        .context("Trying to update key usage count.")
-                })
-                .context("In after_finish.")?;
-            if need_gc {
-                Gc::notify_gc();
-            }
+            DB.with(|db| {
+                db.borrow_mut()
+                    .check_and_update_key_usage_count(key_id)
+                    .context("Trying to update key usage count.")
+            })
+            .context("In after_finish.")?;
         }
         Ok(())
     }
diff --git a/keystore2/src/gc.rs b/keystore2/src/gc.rs
index fbb1cf6..b5b1c6c 100644
--- a/keystore2/src/gc.rs
+++ b/keystore2/src/gc.rs
@@ -18,80 +18,96 @@
 //! optionally dispose of sensitive key material appropriately, and then delete
 //! the key entry from the database.
 
-use crate::globals::{get_keymint_dev_by_uuid, DB};
-use crate::{error::map_km_error, globals::ASYNC_TASK};
-use android_hardware_security_keymint::aidl::android::hardware::security::keymint::IKeyMintDevice::IKeyMintDevice;
-use android_hardware_security_keymint::binder::Strong;
-use anyhow::Result;
+use crate::{
+    async_task,
+    database::{KeystoreDB, Uuid},
+};
+use anyhow::{Context, Result};
+use async_task::AsyncTask;
+use std::sync::Arc;
 
-#[derive(Clone, Copy)]
 pub struct Gc {
-    remaining_tries: u32,
+    async_task: Arc<AsyncTask>,
 }
 
 impl Gc {
-    const MAX_ERROR_RETRIES: u32 = 3u32;
+    /// Creates a garbage collector using the given async_task.
+    /// The garbage collector needs a function to invalidate key blobs and a database connection.
+    /// Both are obtained from the init function. The function is only called if this is first
+    /// time a garbage collector was initialized with the given AsyncTask instance.
+    pub fn new_init_with<F>(async_task: Arc<AsyncTask>, init: F) -> Self
+    where
+        F: FnOnce() -> (Box<dyn Fn(&Uuid, &[u8]) -> Result<()> + Send + 'static>, KeystoreDB)
+            + Send
+            + 'static,
+    {
+        let weak_at = Arc::downgrade(&async_task);
+        // Initialize the task's shelf.
+        async_task.queue_hi(move |shelf| {
+            let (invalidate_key, db) = init();
+            shelf.get_or_put_with(|| GcInternal {
+                blob_id_to_delete: None,
+                invalidate_key,
+                db,
+                async_task: weak_at,
+            });
+        });
+        Self { async_task }
+    }
 
-    /// Attempts to process one unreferenced key from the database.
-    /// Returns Ok(true) if a key was deleted and Ok(false) if there were no more keys to process.
+    /// Notifies the key garbage collector to iterate through orphaned and superseded blobs and
+    /// attempts their deletion. We only process one key at a time and then schedule another
+    /// attempt by queueing it in the async_task (low priority) queue.
+    pub fn notify_gc(&self) {
+        self.async_task.queue_lo(|shelf| shelf.get_downcast_mut::<GcInternal>().unwrap().step())
+    }
+}
+
+struct GcInternal {
+    blob_id_to_delete: Option<i64>,
+    invalidate_key: Box<dyn Fn(&Uuid, &[u8]) -> Result<()> + Send + 'static>,
+    db: KeystoreDB,
+    async_task: std::sync::Weak<AsyncTask>,
+}
+
+impl GcInternal {
+    /// Attempts to process one blob from the database.
     /// We process one key at a time, because deleting a key is a time consuming process which
     /// may involve calling into the KeyMint backend and we don't want to hog neither the backend
     /// nor the database for extended periods of time.
-    fn process_one_key() -> Result<bool> {
-        DB.with(|db| {
-            let mut db = db.borrow_mut();
-            if let Some((key_id, mut key_entry)) = db.get_unreferenced_key()? {
-                if let Some(blob) = key_entry.take_km_blob() {
-                    let km_dev: Strong<dyn IKeyMintDevice> =
-                        get_keymint_dev_by_uuid(key_entry.km_uuid())
-                            .map(|(dev, _)| dev)?
-                            .get_interface()?;
-                    if let Err(e) = map_km_error(km_dev.deleteKey(&blob)) {
-                        // Log but ignore error.
-                        log::error!("Error trying to delete key. {:?}", e);
-                    }
-                }
-                db.purge_key_entry(key_id)?;
-                return Ok(true);
-            }
-            Ok(false)
-        })
-    }
+    fn process_one_key(&mut self) -> Result<()> {
+        if let Some((blob_id, blob, blob_metadata)) = self
+            .db
+            .handle_next_superseded_blob(self.blob_id_to_delete.take())
+            .context("In process_one_key: Trying to handle superseded blob.")?
+        {
+            // Set the blob_id as the next to be deleted blob. So it will be
+            // removed from the database regardless of whether the following
+            // succeeds or not.
+            self.blob_id_to_delete = Some(blob_id);
 
-    /// Processes one key and then schedules another attempt until it runs out of tries or keys
-    /// to delete.
-    fn process_all(mut self) {
-        match Self::process_one_key() {
-            // We successfully removed a key.
-            Ok(true) => self.remaining_tries = Self::MAX_ERROR_RETRIES,
-            // There were no more keys to remove. We may exit.
-            Ok(false) => self.remaining_tries = 0,
-            // An error occurred. We retry in case the error was transient, but
-            // we also count down the number of tries so that we don't spin
-            // indefinitely.
-            Err(e) => {
-                self.remaining_tries -= 1;
-                log::error!(
-                    concat!(
-                        "Failed to delete key. Retrying in case this error was transient. ",
-                        "(Tries remaining {}) {:?}"
-                    ),
-                    self.remaining_tries,
-                    e
-                )
+            // If the key has a km_uuid we try to get the corresponding device
+            // and delete the key, unwrapping if necessary and possible.
+            // (At this time keys may get deleted without having the super encryption
+            // key in this case we can only delete the key from the database.)
+            if let Some(uuid) = blob_metadata.km_uuid() {
+                (self.invalidate_key)(&uuid, &*blob)
+                    .context("In process_one_key: Trying to invalidate key.")?;
             }
         }
-        if self.remaining_tries != 0 {
-            ASYNC_TASK.queue_lo(move || {
-                self.process_all();
-            })
-        }
+        Ok(())
     }
 
-    /// Notifies the key garbage collector to iterate through unreferenced keys and attempt
-    /// their deletion. We only process one key at a time and then schedule another
-    /// attempt by queueing it in the async_task (low priority) queue.
-    pub fn notify_gc() {
-        ASYNC_TASK.queue_lo(|| Self { remaining_tries: Self::MAX_ERROR_RETRIES }.process_all())
+    /// Processes one key and then schedules another attempt until it runs out of blobs to delete.
+    fn step(&mut self) {
+        if let Err(e) = self.process_one_key() {
+            log::error!("Error trying to delete blob entry. {:?}", e);
+        }
+        // Schedule the next step. This gives high priority requests a chance to interleave.
+        if self.blob_id_to_delete.is_some() {
+            if let Some(at) = self.async_task.upgrade() {
+                at.queue_lo(move |shelf| shelf.get_downcast_mut::<GcInternal>().unwrap().step());
+            }
+        }
     }
 }
diff --git a/keystore2/src/globals.rs b/keystore2/src/globals.rs
index 2afabbe..3037a03 100644
--- a/keystore2/src/globals.rs
+++ b/keystore2/src/globals.rs
@@ -28,13 +28,14 @@
 };
 use crate::{enforcements::Enforcements, error::map_km_error};
 use android_hardware_security_keymint::aidl::android::hardware::security::keymint::{
-    KeyMintHardwareInfo::KeyMintHardwareInfo, SecurityLevel::SecurityLevel,
+    IKeyMintDevice::IKeyMintDevice, KeyMintHardwareInfo::KeyMintHardwareInfo,
+    SecurityLevel::SecurityLevel,
 };
 use android_hardware_security_keymint::binder::{StatusCode, Strong};
 use android_security_compat::aidl::android::security::compat::IKeystoreCompatService::IKeystoreCompatService;
 use anyhow::{Context, Result};
 use lazy_static::lazy_static;
-use std::sync::Mutex;
+use std::sync::{Arc, Mutex};
 use std::{cell::RefCell, sync::Once};
 use std::{collections::HashMap, path::Path, path::PathBuf};
 
@@ -43,10 +44,28 @@
 /// Open a connection to the Keystore 2.0 database. This is called during the initialization of
 /// the thread local DB field. It should never be called directly. The first time this is called
 /// we also call KeystoreDB::cleanup_leftovers to restore the key lifecycle invariant. See the
-/// documentation of cleanup_leftovers for more details.
+/// documentation of cleanup_leftovers for more details. The function also constructs a blob
+/// garbage collector. The initializing closure constructs another database connection without
+/// a gc. Although one GC is created for each thread local database connection, this closure
+/// is run only once, as long as the ASYNC_TASK instance is the same. So only one additional
+/// database connection is created for the garbage collector worker.
 fn create_thread_local_db() -> KeystoreDB {
-    let mut db = KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."))
-        .expect("Failed to open database.");
+    let gc = Gc::new_init_with(ASYNC_TASK.clone(), || {
+        (
+            Box::new(|uuid, blob| {
+                let km_dev: Strong<dyn IKeyMintDevice> =
+                    get_keymint_dev_by_uuid(uuid).map(|(dev, _)| dev)?.get_interface()?;
+                map_km_error(km_dev.deleteKey(&*blob))
+                    .context("In invalidate key closure: Trying to invalidate key blob.")
+            }),
+            KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."), None)
+                .expect("Failed to open database."),
+        )
+    });
+
+    let mut db =
+        KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."), Some(gc))
+            .expect("Failed to open database.");
     DB_INIT.call_once(|| {
         log::info!("Touching Keystore 2.0 database for this first time since boot.");
         db.insert_last_off_body(MonotonicRawTime::now())
@@ -62,7 +81,6 @@
                 n
             );
         }
-        Gc::notify_gc();
     });
     db
 }
@@ -113,14 +131,14 @@
     pub static ref DB_PATH: Mutex<PathBuf> = Mutex::new(
         Path::new("/data/misc/keystore").to_path_buf());
     /// Runtime database of unwrapped super keys.
-    pub static ref SUPER_KEY: SuperKeyManager = Default::default();
+    pub static ref SUPER_KEY: Arc<SuperKeyManager> = Default::default();
     /// Map of KeyMint devices.
     static ref KEY_MINT_DEVICES: Mutex<DevicesMap> = Default::default();
     /// Timestamp service.
     static ref TIME_STAMP_DEVICE: Mutex<Option<Asp>> = Default::default();
     /// A single on-demand worker thread that handles deferred tasks with two different
     /// priorities.
-    pub static ref ASYNC_TASK: AsyncTask = Default::default();
+    pub static ref ASYNC_TASK: Arc<AsyncTask> = Default::default();
     /// Singleton for enforcements.
     pub static ref ENFORCEMENTS: Enforcements = Enforcements::new();
     /// LegacyBlobLoader is initialized and exists globally.
diff --git a/keystore2/src/legacy_blob.rs b/keystore2/src/legacy_blob.rs
index 230a82c..ebd063c 100644
--- a/keystore2/src/legacy_blob.rs
+++ b/keystore2/src/legacy_blob.rs
@@ -938,7 +938,7 @@
         )?;
 
         let key_manager = crate::super_key::SuperKeyManager::new();
-        let mut db = crate::database::KeystoreDB::new(temp_dir.path())?;
+        let mut db = crate::database::KeystoreDB::new(temp_dir.path(), None)?;
         let legacy_blob_loader = LegacyBlobLoader::new(temp_dir.path());
 
         assert_eq!(
diff --git a/keystore2/src/security_level.rs b/keystore2/src/security_level.rs
index 417e3c5..c2170b6 100644
--- a/keystore2/src/security_level.rs
+++ b/keystore2/src/security_level.rs
@@ -16,7 +16,7 @@
 
 //! This crate implements the IKeystoreSecurityLevel interface.
 
-use crate::{database::Uuid, gc::Gc, globals::get_keymint_device};
+use crate::globals::get_keymint_device;
 use android_hardware_security_keymint::aidl::android::hardware::security::keymint::{
     Algorithm::Algorithm, HardwareAuthenticatorType::HardwareAuthenticatorType,
     IKeyMintDevice::IKeyMintDevice, KeyCreationResult::KeyCreationResult, KeyFormat::KeyFormat,
@@ -36,17 +36,17 @@
 use crate::key_parameter::KeyParameterValue as KsKeyParamValue;
 use crate::utils::{check_key_permission, uid_to_android_user, Asp};
 use crate::{
-    database::{CertificateInfo, KeyIdGuard},
-    globals::DB,
-};
-use crate::{
-    database::{DateTime, KeyMetaData, KeyMetaEntry, KeyType},
+    database::{
+        BlobMetaData, BlobMetaEntry, DateTime, KeyEntry, KeyEntryLoadBits, KeyMetaData,
+        KeyMetaEntry, KeyType, SubComponentType, Uuid,
+    },
+    operation::KeystoreOperation,
+    operation::OperationDb,
     permission::KeyPerm,
 };
 use crate::{
-    database::{KeyEntry, KeyEntryLoadBits, SubComponentType},
-    operation::KeystoreOperation,
-    operation::OperationDb,
+    database::{CertificateInfo, KeyIdGuard},
+    globals::DB,
 };
 use crate::{
     error::{self, map_km_error, map_or_log_err, Error, ErrorCode},
@@ -138,23 +138,22 @@
             }
             _ => DB
                 .with::<_, Result<KeyDescriptor>>(|db| {
-                    let mut metadata = KeyMetaData::new();
-                    metadata.add(KeyMetaEntry::CreationDate(creation_date));
+                    let mut key_metadata = KeyMetaData::new();
+                    key_metadata.add(KeyMetaEntry::CreationDate(creation_date));
+                    let mut blob_metadata = BlobMetaData::new();
+                    blob_metadata.add(BlobMetaEntry::KmUuid(self.km_uuid));
 
                     let mut db = db.borrow_mut();
-                    let (need_gc, key_id) = db
+                    let key_id = db
                         .store_new_key(
                             &key,
                             &key_parameters,
-                            &key_blob,
+                            &(&key_blob, &blob_metadata),
                             &cert_info,
-                            &metadata,
+                            &key_metadata,
                             &self.km_uuid,
                         )
                         .context("In store_new_key.")?;
-                    if need_gc {
-                        Gc::notify_gc();
-                    }
                     Ok(KeyDescriptor {
                         domain: Domain::KEY_ID,
                         nspace: key_id.id(),
@@ -185,7 +184,7 @@
         // so that we can use it by reference like the blob provided by the key descriptor.
         // Otherwise, we would have to clone the blob from the key descriptor.
         let scoping_blob: Vec<u8>;
-        let (km_blob, key_properties, key_id_guard) = match key.domain {
+        let (km_blob, key_properties, key_id_guard, blob_metadata) = match key.domain {
             Domain::BLOB => {
                 check_key_permission(KeyPerm::use_(), key, &None)
                     .context("In create_operation: checking use permission for Domain::BLOB.")?;
@@ -201,6 +200,7 @@
                     },
                     None,
                     None,
+                    None,
                 )
             }
             _ => {
@@ -215,19 +215,19 @@
                         )
                     })
                     .context("In create_operation: Failed to load key blob.")?;
-                scoping_blob = match key_entry.take_km_blob() {
-                    Some(blob) => blob,
-                    None => {
-                        return Err(Error::sys()).context(concat!(
-                            "In create_operation: Successfully loaded key entry,",
-                            " but KM blob was missing."
-                        ))
-                    }
-                };
+
+                let (blob, blob_metadata) =
+                    key_entry.take_key_blob_info().ok_or_else(Error::sys).context(concat!(
+                        "In create_operation: Successfully loaded key entry, ",
+                        "but KM blob was missing."
+                    ))?;
+                scoping_blob = blob;
+
                 (
                     &scoping_blob,
                     Some((key_id_guard.id(), key_entry.into_key_parameters())),
                     Some(key_id_guard),
+                    Some(blob_metadata),
                 )
             }
         };
@@ -265,7 +265,7 @@
             .upgrade_keyblob_if_required_with(
                 &*km_dev,
                 key_id_guard,
-                &km_blob,
+                &(km_blob, blob_metadata.as_ref()),
                 &operation_parameters,
                 |blob| loop {
                     match map_km_error(km_dev.begin(
@@ -498,7 +498,7 @@
             _ => panic!("Unreachable."),
         };
 
-        // import_wrapped_key requires the rebind permission for the new key.
+        // Import_wrapped_key requires the rebind permission for the new key.
         check_key_permission(KeyPerm::rebind(), &key, &None).context("In import_wrapped_key.")?;
 
         let (wrapping_key_id_guard, wrapping_key_entry) = DB
@@ -512,8 +512,8 @@
                 )
             })
             .context("Failed to load wrapping key.")?;
-        let wrapping_key_blob = match wrapping_key_entry.km_blob() {
-            Some(blob) => blob,
+        let (wrapping_key_blob, wrapping_blob_metadata) = match wrapping_key_entry.key_blob_info() {
+            Some((blob, metadata)) => (blob, metadata),
             None => {
                 return Err(error::Error::sys()).context(concat!(
                     "No km_blob after successfully loading key.",
@@ -549,12 +549,12 @@
             .upgrade_keyblob_if_required_with(
                 &*km_dev,
                 Some(wrapping_key_id_guard),
-                wrapping_key_blob,
+                &(&wrapping_key_blob, Some(&wrapping_blob_metadata)),
                 &[],
                 |wrapping_blob| {
                     let creation_result = map_km_error(km_dev.importWrappedKey(
                         wrapped_data,
-                        wrapping_key_blob,
+                        wrapping_blob,
                         masking_key,
                         &params,
                         pw_sid,
@@ -574,16 +574,16 @@
         &self,
         km_dev: &dyn IKeyMintDevice,
         key_id_guard: Option<KeyIdGuard>,
-        blob: &[u8],
+        blob_info: &(&[u8], Option<&BlobMetaData>),
         params: &[KeyParameter],
         f: F,
     ) -> Result<(T, Option<Vec<u8>>)>
     where
         F: Fn(&[u8]) -> Result<T, Error>,
     {
-        match f(blob) {
+        match f(blob_info.0) {
             Err(Error::Km(ErrorCode::KEY_REQUIRES_UPGRADE)) => {
-                let upgraded_blob = map_km_error(km_dev.upgradeKey(blob, params))
+                let upgraded_blob = map_km_error(km_dev.upgradeKey(blob_info.0, params))
                     .context("In upgrade_keyblob_if_required_with: Upgrade failed.")?;
                 key_id_guard.map_or(Ok(()), |key_id_guard| {
                     DB.with(|db| {
@@ -591,6 +591,7 @@
                             &key_id_guard,
                             SubComponentType::KEY_BLOB,
                             Some(&upgraded_blob),
+                            blob_info.1,
                         )
                     })
                     .context(concat!(
diff --git a/keystore2/src/service.rs b/keystore2/src/service.rs
index 6aa7ed5..efd62e3 100644
--- a/keystore2/src/service.rs
+++ b/keystore2/src/service.rs
@@ -20,6 +20,7 @@
 
 use std::collections::HashMap;
 
+use crate::error::{self, map_or_log_err, ErrorCode};
 use crate::permission::{KeyPerm, KeystorePerm};
 use crate::security_level::KeystoreSecurityLevel;
 use crate::utils::{
@@ -32,10 +33,6 @@
     database::{KeyEntryLoadBits, KeyType, SubComponentType},
     error::ResponseCode,
 };
-use crate::{
-    error::{self, map_or_log_err, ErrorCode},
-    gc::Gc,
-};
 use android_hardware_security_keymint::aidl::android::hardware::security::keymint::SecurityLevel::SecurityLevel;
 use android_system_keystore2::aidl::android::system::keystore2::{
     Domain::Domain, IKeystoreSecurityLevel::IKeystoreSecurityLevel,
@@ -185,10 +182,10 @@
             .context("Failed to load key entry.")?;
 
             if let Some((key_id_guard, key_entry)) = entry {
-                db.set_blob(&key_id_guard, SubComponentType::CERT, public_cert)
+                db.set_blob(&key_id_guard, SubComponentType::CERT, public_cert, None)
                     .context("Failed to update cert subcomponent.")?;
 
-                db.set_blob(&key_id_guard, SubComponentType::CERT_CHAIN, certificate_chain)
+                db.set_blob(&key_id_guard, SubComponentType::CERT_CHAIN, certificate_chain, None)
                     .context("Failed to update cert chain subcomponent.")?;
                 return Ok(());
             }
@@ -269,16 +266,12 @@
 
     fn delete_key(&self, key: &KeyDescriptor) -> Result<()> {
         let caller_uid = ThreadState::get_calling_uid();
-        let need_gc = DB
-            .with(|db| {
-                db.borrow_mut().unbind_key(&key, KeyType::Client, caller_uid, |k, av| {
-                    check_key_permission(KeyPerm::delete(), k, &av).context("During delete_key.")
-                })
+        DB.with(|db| {
+            db.borrow_mut().unbind_key(&key, KeyType::Client, caller_uid, |k, av| {
+                check_key_permission(KeyPerm::delete(), k, &av).context("During delete_key.")
             })
-            .context("In delete_key: Trying to unbind the key.")?;
-        if need_gc {
-            Gc::notify_gc();
-        }
+        })
+        .context("In delete_key: Trying to unbind the key.")?;
         Ok(())
     }
 
diff --git a/keystore2/src/super_key.rs b/keystore2/src/super_key.rs
index 9872513..79f53bf 100644
--- a/keystore2/src/super_key.rs
+++ b/keystore2/src/super_key.rs
@@ -15,7 +15,7 @@
 #![allow(dead_code)]
 
 use crate::{
-    database::EncryptedBy, database::KeyMetaData, database::KeyMetaEntry, database::KeystoreDB,
+    database::BlobMetaData, database::BlobMetaEntry, database::EncryptedBy, database::KeystoreDB,
     error::Error, error::ResponseCode, legacy_blob::LegacyBlobLoader,
 };
 use android_system_keystore2::aidl::android::system::keystore2::Domain::Domain;
@@ -141,49 +141,50 @@
                         generate_salt().context("In create_new_key: Failed to generate salt.")?;
                     let derived_key = derive_key_from_password(pw, Some(&salt), AES_256_KEY_LENGTH)
                         .context("In create_new_key: Failed to derive password.")?;
-                    let mut metadata = KeyMetaData::new();
-                    metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
-                    metadata.add(KeyMetaEntry::Salt(salt));
+                    let mut metadata = BlobMetaData::new();
+                    metadata.add(BlobMetaEntry::EncryptedBy(EncryptedBy::Password));
+                    metadata.add(BlobMetaEntry::Salt(salt));
                     let (encrypted_key, iv, tag) = aes_gcm_encrypt(&super_key, &derived_key)
                         .context("In create_new_key: Failed to encrypt new super key.")?;
-                    metadata.add(KeyMetaEntry::Iv(iv));
-                    metadata.add(KeyMetaEntry::AeadTag(tag));
+                    metadata.add(BlobMetaEntry::Iv(iv));
+                    metadata.add(BlobMetaEntry::AeadTag(tag));
                     Ok((encrypted_key, metadata))
                 },
             )
             .context("In unlock_user_key: Failed to get key id.")?;
 
-        let metadata = entry.metadata();
-        let super_key = match (
-            metadata.encrypted_by(),
-            metadata.salt(),
-            metadata.iv(),
-            metadata.aead_tag(),
-            entry.km_blob(),
-        ) {
-            (Some(&EncryptedBy::Password), Some(salt), Some(iv), Some(tag), Some(blob)) => {
-                let key = derive_key_from_password(pw, Some(salt), AES_256_KEY_LENGTH)
-                    .context("In unlock_user_key: Failed to generate key from password.")?;
+        if let Some((ref blob, ref metadata)) = entry.key_blob_info() {
+            let super_key = match (
+                metadata.encrypted_by(),
+                metadata.salt(),
+                metadata.iv(),
+                metadata.aead_tag(),
+            ) {
+                (Some(&EncryptedBy::Password), Some(salt), Some(iv), Some(tag)) => {
+                    let key = derive_key_from_password(pw, Some(salt), AES_256_KEY_LENGTH)
+                        .context("In unlock_user_key: Failed to generate key from password.")?;
 
-                aes_gcm_decrypt(blob, iv, tag, &key)
-                    .context("In unlock_user_key: Failed to decrypt key blob.")?
-            }
-            (enc_by, salt, iv, tag, blob) => {
-                return Err(Error::Rc(ResponseCode::VALUE_CORRUPTED)).context(format!(
-                    concat!(
-                        "In unlock_user_key: Super key has incomplete metadata.",
-                        "Present: encrypted_by: {}, salt: {}, iv: {}, aead_tag: {}, blob: {}."
-                    ),
-                    enc_by.is_some(),
-                    salt.is_some(),
-                    iv.is_some(),
-                    tag.is_some(),
-                    blob.is_some()
-                ));
-            }
-        };
-
-        self.install_per_boot_key_for_user(user, entry.id(), super_key);
+                    aes_gcm_decrypt(blob, iv, tag, &key)
+                        .context("In unlock_user_key: Failed to decrypt key blob.")?
+                }
+                (enc_by, salt, iv, tag) => {
+                    return Err(Error::Rc(ResponseCode::VALUE_CORRUPTED)).context(format!(
+                        concat!(
+                            "In unlock_user_key: Super key has incomplete metadata.",
+                            "Present: encrypted_by: {}, salt: {}, iv: {}, aead_tag: {}."
+                        ),
+                        enc_by.is_some(),
+                        salt.is_some(),
+                        iv.is_some(),
+                        tag.is_some(),
+                    ));
+                }
+            };
+            self.install_per_boot_key_for_user(user, entry.id(), super_key);
+        } else {
+            return Err(Error::Rc(ResponseCode::VALUE_CORRUPTED))
+                .context("In unlock_user_key: Key entry has no key blob.");
+        }
 
         Ok(())
     }
@@ -192,7 +193,7 @@
     /// The function queries `metadata.encrypted_by()` to determine the encryption key.
     /// It then check if the required key is memory resident, and if so decrypts the
     /// blob.
-    pub fn unwrap_key(&self, blob: &[u8], metadata: &KeyMetaData) -> Result<ZVec> {
+    pub fn unwrap_key(&self, blob: &[u8], metadata: &BlobMetaData) -> Result<ZVec> {
         match metadata.encrypted_by() {
             Some(EncryptedBy::KeyId(key_id)) => match self.get_key(key_id) {
                 Some(key) => {
@@ -207,7 +208,7 @@
     }
 
     /// Unwraps an encrypted key blob given an encryption key.
-    fn unwrap_key_with_key(blob: &[u8], metadata: &KeyMetaData, key: &[u8]) -> Result<ZVec> {
+    fn unwrap_key_with_key(blob: &[u8], metadata: &BlobMetaData, key: &[u8]) -> Result<ZVec> {
         match (metadata.iv(), metadata.aead_tag()) {
             (Some(iv), Some(tag)) => aes_gcm_decrypt(blob, iv, tag, key)
                 .context("In unwrap_key_with_key: Failed to decrypt the key blob."),