[PATCH] keys: Discard key spinlock and use RCU for key payload

The attached patch changes the key implementation in a number of ways:

 (1) It removes the spinlock from the key structure.

 (2) The key flags are now accessed using atomic bitops instead of
     write-locking the key spinlock and using C bitwise operators.

     The three instantiation flags are dealt with with the construction
     semaphore held during the request_key/instantiate/negate sequence, thus
     rendering the spinlock superfluous.

     The key flags are also now bit numbers not bit masks.

 (3) The key payload is now accessed using RCU. This permits the recursive
     keyring search algorithm to be simplified greatly since no locks need be
     taken other than the usual RCU preemption disablement. Searching now does
     not require any locks or semaphores to be held; merely that the starting
     keyring be pinned.

 (4) The keyring payload now includes an RCU head so that it can be disposed
     of by call_rcu(). This requires that the payload be copied on unlink to
     prevent introducing races in copy-down vs search-up.

 (5) The user key payload is now a structure with the data following it. It
     includes an RCU head like the keyring payload and for the same reason. It
     also contains a data length because the data length in the key may be
     changed on another CPU whilst an RCU protected read is in progress on the
     payload. This would then see the supposed RCU payload and the on-key data
     length getting out of sync.

     I'm tempted to drop the key's datalen entirely, except that it's used in
     conjunction with quota management and so is a little tricky to get rid
     of.

 (6) Update the keys documentation.

Signed-Off-By: David Howells <dhowells@redhat.com>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
diff --git a/security/keys/keyring.c b/security/keys/keyring.c
index e2ab4f8..c9a5de1 100644
--- a/security/keys/keyring.c
+++ b/security/keys/keyring.c
@@ -132,10 +132,17 @@
 		(PAGE_SIZE - sizeof(*klist)) / sizeof(struct key);
 
 	ret = 0;
-	sklist = source->payload.subscriptions;
 
-	if (sklist && sklist->nkeys > 0) {
+	/* find out how many keys are currently linked */
+	rcu_read_lock();
+	sklist = rcu_dereference(source->payload.subscriptions);
+	max = 0;
+	if (sklist)
 		max = sklist->nkeys;
+	rcu_read_unlock();
+
+	/* allocate a new payload and stuff load with key links */
+	if (max > 0) {
 		BUG_ON(max > limit);
 
 		max = (max + 3) & ~3;
@@ -148,6 +155,10 @@
 		if (!klist)
 			goto error;
 
+		/* set links */
+		rcu_read_lock();
+		sklist = rcu_dereference(source->payload.subscriptions);
+
 		klist->maxkeys = max;
 		klist->nkeys = sklist->nkeys;
 		memcpy(klist->keys,
@@ -157,7 +168,9 @@
 		for (loop = klist->nkeys - 1; loop >= 0; loop--)
 			atomic_inc(&klist->keys[loop]->usage);
 
-		keyring->payload.subscriptions = klist;
+		rcu_read_unlock();
+
+		rcu_assign_pointer(keyring->payload.subscriptions, klist);
 		ret = 0;
 	}
 
@@ -192,7 +205,7 @@
 		write_unlock(&keyring_name_lock);
 	}
 
-	klist = keyring->payload.subscriptions;
+	klist = rcu_dereference(keyring->payload.subscriptions);
 	if (klist) {
 		for (loop = klist->nkeys - 1; loop >= 0; loop--)
 			key_put(klist->keys[loop]);
@@ -216,17 +229,20 @@
 		seq_puts(m, "[anon]");
 	}
 
-	klist = keyring->payload.subscriptions;
+	rcu_read_lock();
+	klist = rcu_dereference(keyring->payload.subscriptions);
 	if (klist)
 		seq_printf(m, ": %u/%u", klist->nkeys, klist->maxkeys);
 	else
 		seq_puts(m, ": empty");
+	rcu_read_unlock();
 
 } /* end keyring_describe() */
 
 /*****************************************************************************/
 /*
  * read a list of key IDs from the keyring's contents
+ * - the keyring's semaphore is read-locked
  */
 static long keyring_read(const struct key *keyring,
 			 char __user *buffer, size_t buflen)
@@ -237,7 +253,7 @@
 	int loop, ret;
 
 	ret = 0;
-	klist = keyring->payload.subscriptions;
+	klist = rcu_dereference(keyring->payload.subscriptions);
 
 	if (klist) {
 		/* calculate how much data we could return */
@@ -320,7 +336,7 @@
 			       key_match_func_t match)
 {
 	struct {
-		struct key *keyring;
+		struct keyring_list *keylist;
 		int kix;
 	} stack[KEYRING_SEARCH_MAX_DEPTH];
 
@@ -328,10 +344,12 @@
 	struct timespec now;
 	struct key *key;
 	long err;
-	int sp, psp, kix;
+	int sp, kix;
 
 	key_check(keyring);
 
+	rcu_read_lock();
+
 	/* top keyring must have search permission to begin the search */
 	key = ERR_PTR(-EACCES);
 	if (!key_permission(keyring, KEY_SEARCH))
@@ -347,11 +365,10 @@
 
 	/* start processing a new keyring */
  descend:
-	read_lock(&keyring->lock);
-	if (keyring->flags & KEY_FLAG_REVOKED)
+	if (test_bit(KEY_FLAG_REVOKED, &keyring->flags))
 		goto not_this_keyring;
 
-	keylist = keyring->payload.subscriptions;
+	keylist = rcu_dereference(keyring->payload.subscriptions);
 	if (!keylist)
 		goto not_this_keyring;
 
@@ -364,7 +381,7 @@
 			continue;
 
 		/* skip revoked keys and expired keys */
-		if (key->flags & KEY_FLAG_REVOKED)
+		if (test_bit(KEY_FLAG_REVOKED, &key->flags))
 			continue;
 
 		if (key->expiry && now.tv_sec >= key->expiry)
@@ -379,7 +396,7 @@
 			continue;
 
 		/* we set a different error code if we find a negative key */
-		if (key->flags & KEY_FLAG_NEGATIVE) {
+		if (test_bit(KEY_FLAG_NEGATIVE, &key->flags)) {
 			err = -ENOKEY;
 			continue;
 		}
@@ -390,48 +407,37 @@
 	/* search through the keyrings nested in this one */
 	kix = 0;
  ascend:
-	while (kix < keylist->nkeys) {
+	for (; kix < keylist->nkeys; kix++) {
 		key = keylist->keys[kix];
 		if (key->type != &key_type_keyring)
-			goto next;
+			continue;
 
 		/* recursively search nested keyrings
 		 * - only search keyrings for which we have search permission
 		 */
 		if (sp >= KEYRING_SEARCH_MAX_DEPTH)
-			goto next;
+			continue;
 
 		if (!key_permission(key, KEY_SEARCH))
-			goto next;
-
-		/* evade loops in the keyring tree */
-		for (psp = 0; psp < sp; psp++)
-			if (stack[psp].keyring == keyring)
-				goto next;
+			continue;
 
 		/* stack the current position */
-		stack[sp].keyring = keyring;
+		stack[sp].keylist = keylist;
 		stack[sp].kix = kix;
 		sp++;
 
 		/* begin again with the new keyring */
 		keyring = key;
 		goto descend;
-
-	next:
-		kix++;
 	}
 
 	/* the keyring we're looking at was disqualified or didn't contain a
 	 * matching key */
  not_this_keyring:
-	read_unlock(&keyring->lock);
-
 	if (sp > 0) {
 		/* resume the processing of a keyring higher up in the tree */
 		sp--;
-		keyring = stack[sp].keyring;
-		keylist = keyring->payload.subscriptions;
+		keylist = stack[sp].keylist;
 		kix = stack[sp].kix + 1;
 		goto ascend;
 	}
@@ -442,16 +448,9 @@
 	/* we found a viable match */
  found:
 	atomic_inc(&key->usage);
-	read_unlock(&keyring->lock);
-
-	/* unwind the keyring stack */
-	while (sp > 0) {
-		sp--;
-		read_unlock(&stack[sp].keyring->lock);
-	}
-
 	key_check(key);
  error:
+	rcu_read_unlock();
 	return key;
 
 } /* end keyring_search_aux() */
@@ -489,7 +488,9 @@
 	struct key *key;
 	int loop;
 
-	klist = keyring->payload.subscriptions;
+	rcu_read_lock();
+
+	klist = rcu_dereference(keyring->payload.subscriptions);
 	if (klist) {
 		for (loop = 0; loop < klist->nkeys; loop++) {
 			key = klist->keys[loop];
@@ -497,7 +498,7 @@
 			if (key->type == ktype &&
 			    key->type->match(key, description) &&
 			    key_permission(key, perm) &&
-			    !(key->flags & KEY_FLAG_REVOKED)
+			    !test_bit(KEY_FLAG_REVOKED, &key->flags)
 			    )
 				goto found;
 		}
@@ -509,6 +510,7 @@
  found:
 	atomic_inc(&key->usage);
  error:
+	rcu_read_unlock();
 	return key;
 
 } /* end __keyring_search_one() */
@@ -540,7 +542,7 @@
 				    &keyring_name_hash[bucket],
 				    type_data.link
 				    ) {
-			if (keyring->flags & KEY_FLAG_REVOKED)
+			if (test_bit(KEY_FLAG_REVOKED, &keyring->flags))
 				continue;
 
 			if (strcmp(keyring->description, name) != 0)
@@ -579,7 +581,7 @@
 static int keyring_detect_cycle(struct key *A, struct key *B)
 {
 	struct {
-		struct key *subtree;
+		struct keyring_list *keylist;
 		int kix;
 	} stack[KEYRING_SEARCH_MAX_DEPTH];
 
@@ -587,20 +589,21 @@
 	struct key *subtree, *key;
 	int sp, kix, ret;
 
+	rcu_read_lock();
+
 	ret = -EDEADLK;
 	if (A == B)
-		goto error;
+		goto cycle_detected;
 
 	subtree = B;
 	sp = 0;
 
 	/* start processing a new keyring */
  descend:
-	read_lock(&subtree->lock);
-	if (subtree->flags & KEY_FLAG_REVOKED)
+	if (test_bit(KEY_FLAG_REVOKED, &subtree->flags))
 		goto not_this_keyring;
 
-	keylist = subtree->payload.subscriptions;
+	keylist = rcu_dereference(subtree->payload.subscriptions);
 	if (!keylist)
 		goto not_this_keyring;
 	kix = 0;
@@ -619,7 +622,7 @@
 				goto too_deep;
 
 			/* stack the current position */
-			stack[sp].subtree = subtree;
+			stack[sp].keylist = keylist;
 			stack[sp].kix = kix;
 			sp++;
 
@@ -632,13 +635,10 @@
 	/* the keyring we're looking at was disqualified or didn't contain a
 	 * matching key */
  not_this_keyring:
-	read_unlock(&subtree->lock);
-
 	if (sp > 0) {
 		/* resume the checking of a keyring higher up in the tree */
 		sp--;
-		subtree = stack[sp].subtree;
-		keylist = subtree->payload.subscriptions;
+		keylist = stack[sp].keylist;
 		kix = stack[sp].kix + 1;
 		goto ascend;
 	}
@@ -646,30 +646,36 @@
 	ret = 0; /* no cycles detected */
 
  error:
+	rcu_read_unlock();
 	return ret;
 
  too_deep:
 	ret = -ELOOP;
-	goto error_unwind;
+	goto error;
+
  cycle_detected:
 	ret = -EDEADLK;
- error_unwind:
-	read_unlock(&subtree->lock);
-
-	/* unwind the keyring stack */
-	while (sp > 0) {
-		sp--;
-		read_unlock(&stack[sp].subtree->lock);
-	}
-
 	goto error;
 
 } /* end keyring_detect_cycle() */
 
 /*****************************************************************************/
 /*
+ * dispose of a keyring list after the RCU grace period
+ */
+static void keyring_link_rcu_disposal(struct rcu_head *rcu)
+{
+	struct keyring_list *klist =
+		container_of(rcu, struct keyring_list, rcu);
+
+	kfree(klist);
+
+} /* end keyring_link_rcu_disposal() */
+
+/*****************************************************************************/
+/*
  * link a key into to a keyring
- * - must be called with the keyring's semaphore held
+ * - must be called with the keyring's semaphore write-locked
  */
 int __key_link(struct key *keyring, struct key *key)
 {
@@ -679,7 +685,7 @@
 	int ret;
 
 	ret = -EKEYREVOKED;
-	if (keyring->flags & KEY_FLAG_REVOKED)
+	if (test_bit(KEY_FLAG_REVOKED, &keyring->flags))
 		goto error;
 
 	ret = -ENOTDIR;
@@ -710,9 +716,10 @@
 		/* there's sufficient slack space to add directly */
 		atomic_inc(&key->usage);
 
-		write_lock(&keyring->lock);
-		klist->keys[klist->nkeys++] = key;
-		write_unlock(&keyring->lock);
+		klist->keys[klist->nkeys] = key;
+		smp_wmb();
+		klist->nkeys++;
+		smp_wmb();
 
 		ret = 0;
 	}
@@ -723,6 +730,8 @@
 			max += klist->maxkeys;
 
 		ret = -ENFILE;
+		if (max > 65535)
+			goto error3;
 		size = sizeof(*klist) + sizeof(*key) * max;
 		if (size > PAGE_SIZE)
 			goto error3;
@@ -743,14 +752,13 @@
 
 		/* add the key into the new space */
 		atomic_inc(&key->usage);
-
-		write_lock(&keyring->lock);
-		keyring->payload.subscriptions = nklist;
 		nklist->keys[nklist->nkeys++] = key;
-		write_unlock(&keyring->lock);
+
+		rcu_assign_pointer(keyring->payload.subscriptions, nklist);
 
 		/* dispose of the old keyring list */
-		kfree(klist);
+		if (klist)
+			call_rcu(&klist->rcu, keyring_link_rcu_disposal);
 
 		ret = 0;
 	}
@@ -791,11 +799,26 @@
 
 /*****************************************************************************/
 /*
+ * dispose of a keyring list after the RCU grace period, freeing the unlinked
+ * key
+ */
+static void keyring_unlink_rcu_disposal(struct rcu_head *rcu)
+{
+	struct keyring_list *klist =
+		container_of(rcu, struct keyring_list, rcu);
+
+	key_put(klist->keys[klist->delkey]);
+	kfree(klist);
+
+} /* end keyring_unlink_rcu_disposal() */
+
+/*****************************************************************************/
+/*
  * unlink the first link to a key from a keyring
  */
 int key_unlink(struct key *keyring, struct key *key)
 {
-	struct keyring_list *klist;
+	struct keyring_list *klist, *nklist;
 	int loop, ret;
 
 	key_check(keyring);
@@ -819,31 +842,45 @@
 	ret = -ENOENT;
 	goto error;
 
- key_is_present:
+key_is_present:
+	/* we need to copy the key list for RCU purposes */
+	nklist = kmalloc(sizeof(*klist) + sizeof(*key) * klist->maxkeys,
+			 GFP_KERNEL);
+	if (!nklist)
+		goto nomem;
+	nklist->maxkeys = klist->maxkeys;
+	nklist->nkeys = klist->nkeys - 1;
+
+	if (loop > 0)
+		memcpy(&nklist->keys[0],
+		       &klist->keys[0],
+		       loop * sizeof(klist->keys[0]));
+
+	if (loop < nklist->nkeys)
+		memcpy(&nklist->keys[loop],
+		       &klist->keys[loop + 1],
+		       (nklist->nkeys - loop) * sizeof(klist->keys[0]));
+
 	/* adjust the user's quota */
 	key_payload_reserve(keyring,
 			    keyring->datalen - KEYQUOTA_LINK_BYTES);
 
-	/* shuffle down the key pointers
-	 * - it might be worth shrinking the allocated memory, but that runs
-	 *   the risk of ENOMEM as we would have to copy
-	 */
-	write_lock(&keyring->lock);
-
-	klist->nkeys--;
-	if (loop < klist->nkeys)
-		memcpy(&klist->keys[loop],
-		       &klist->keys[loop + 1],
-		       (klist->nkeys - loop) * sizeof(struct key *));
-
-	write_unlock(&keyring->lock);
+	rcu_assign_pointer(keyring->payload.subscriptions, nklist);
 
 	up_write(&keyring->sem);
-	key_put(key);
+
+	/* schedule for later cleanup */
+	klist->delkey = loop;
+	call_rcu(&klist->rcu, keyring_unlink_rcu_disposal);
+
 	ret = 0;
 
- error:
+error:
 	return ret;
+nomem:
+	ret = -ENOMEM;
+	up_write(&keyring->sem);
+	goto error;
 
 } /* end key_unlink() */
 
@@ -851,13 +888,32 @@
 
 /*****************************************************************************/
 /*
+ * dispose of a keyring list after the RCU grace period, releasing the keys it
+ * links to
+ */
+static void keyring_clear_rcu_disposal(struct rcu_head *rcu)
+{
+	struct keyring_list *klist;
+	int loop;
+
+	klist = container_of(rcu, struct keyring_list, rcu);
+
+	for (loop = klist->nkeys - 1; loop >= 0; loop--)
+		key_put(klist->keys[loop]);
+
+	kfree(klist);
+
+} /* end keyring_clear_rcu_disposal() */
+
+/*****************************************************************************/
+/*
  * clear the specified process keyring
  * - implements keyctl(KEYCTL_CLEAR)
  */
 int keyring_clear(struct key *keyring)
 {
 	struct keyring_list *klist;
-	int loop, ret;
+	int ret;
 
 	ret = -ENOTDIR;
 	if (keyring->type == &key_type_keyring) {
@@ -870,20 +926,15 @@
 			key_payload_reserve(keyring,
 					    sizeof(struct keyring_list));
 
-			write_lock(&keyring->lock);
-			keyring->payload.subscriptions = NULL;
-			write_unlock(&keyring->lock);
+			rcu_assign_pointer(keyring->payload.subscriptions,
+					   NULL);
 		}
 
 		up_write(&keyring->sem);
 
 		/* free the keys after the locks have been dropped */
-		if (klist) {
-			for (loop = klist->nkeys - 1; loop >= 0; loop--)
-				key_put(klist->keys[loop]);
-
-			kfree(klist);
-		}
+		if (klist)
+			call_rcu(&klist->rcu, keyring_clear_rcu_disposal);
 
 		ret = 0;
 	}