NFSv4: Reduce the chances of an open_owner identifier collision

Currently we just use a 32-bit counter.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 6b42440..ccb4550 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -130,7 +130,6 @@
 #ifdef CONFIG_NFS_V4
 	init_rwsem(&clp->cl_sem);
 	INIT_LIST_HEAD(&clp->cl_delegations);
-	INIT_LIST_HEAD(&clp->cl_state_owners);
 	spin_lock_init(&clp->cl_lock);
 	INIT_DELAYED_WORK(&clp->cl_renewd, nfs4_renew_state);
 	rpc_init_wait_queue(&clp->cl_rpcwaitq, "NFS client");
@@ -154,7 +153,7 @@
 #ifdef CONFIG_NFS_V4
 	if (__test_and_clear_bit(NFS_CS_RENEWD, &clp->cl_res_state))
 		nfs4_kill_renewd(clp);
-	BUG_ON(!list_empty(&clp->cl_state_owners));
+	BUG_ON(!RB_EMPTY_ROOT(&clp->cl_state_owners));
 	if (__test_and_clear_bit(NFS_CS_IDMAP, &clp->cl_res_state))
 		nfs_idmap_delete(clp);
 #endif
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index c97a0ad..44b56c9 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -70,19 +70,25 @@
 		seqid->flags |= NFS_SEQID_CONFIRMED;
 }
 
+struct nfs_unique_id {
+	struct rb_node rb_node;
+	__u64 id;
+};
+
 /*
  * NFS4 state_owners and lock_owners are simply labels for ordered
  * sequences of RPC calls. Their sole purpose is to provide once-only
  * semantics by allowing the server to identify replayed requests.
  */
 struct nfs4_state_owner {
-	spinlock_t	     so_lock;
-	struct list_head     so_list;	 /* per-clientid list of state_owners */
+	struct nfs_unique_id so_owner_id;
 	struct nfs_client    *so_client;
-	u32                  so_id;      /* 32-bit identifier, unique */
-	atomic_t	     so_count;
+	struct rb_node	     so_client_node;
 
 	struct rpc_cred	     *so_cred;	 /* Associated cred */
+
+	spinlock_t	     so_lock;
+	atomic_t	     so_count;
 	struct list_head     so_states;
 	struct list_head     so_delegations;
 	struct nfs_seqid_counter so_seqid;
@@ -108,7 +114,7 @@
 #define NFS_LOCK_INITIALIZED 1
 	int			ls_flags;
 	struct nfs_seqid_counter	ls_seqid;
-	u32			ls_id;
+	struct nfs_unique_id	ls_id;
 	nfs4_stateid		ls_stateid;
 	atomic_t		ls_count;
 };
@@ -189,7 +195,6 @@
 
 /* nfs4state.c */
 struct rpc_cred *nfs4_get_renew_cred(struct nfs_client *clp);
-extern u32 nfs4_alloc_lockowner_id(struct nfs_client *);
 
 extern struct nfs4_state_owner * nfs4_get_state_owner(struct nfs_server *, struct rpc_cred *);
 extern void nfs4_put_state_owner(struct nfs4_state_owner *);
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 84d0b7e..1840ebc 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -253,7 +253,7 @@
 	p->o_arg.fh = NFS_FH(dir);
 	p->o_arg.open_flags = flags,
 	p->o_arg.clientid = server->nfs_client->cl_clientid;
-	p->o_arg.id = sp->so_id;
+	p->o_arg.id = sp->so_owner_id.id;
 	p->o_arg.name = &p->path.dentry->d_name;
 	p->o_arg.server = server;
 	p->o_arg.bitmask = server->attr_bitmask;
@@ -651,7 +651,7 @@
 	if (nfs_wait_on_sequence(data->o_arg.seqid, task) != 0)
 		return;
 	/* Update sequence id. */
-	data->o_arg.id = sp->so_id;
+	data->o_arg.id = sp->so_owner_id.id;
 	data->o_arg.clientid = sp->so_client->cl_clientid;
 	if (data->o_arg.claim == NFS4_OPEN_CLAIM_PREVIOUS)
 		msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_OPEN_NOATTR];
@@ -3029,7 +3029,7 @@
 	if (status != 0)
 		goto out;
 	lsp = request->fl_u.nfs4_fl.owner;
-	arg.lock_owner.id = lsp->ls_id; 
+	arg.lock_owner.id = lsp->ls_id.id;
 	status = rpc_call_sync(server->client, &msg, 0);
 	switch (status) {
 		case 0:
@@ -3243,7 +3243,7 @@
 		goto out_free;
 	p->arg.lock_stateid = &lsp->ls_stateid;
 	p->arg.lock_owner.clientid = server->nfs_client->cl_clientid;
-	p->arg.lock_owner.id = lsp->ls_id;
+	p->arg.lock_owner.id = lsp->ls_id.id;
 	p->lsp = lsp;
 	atomic_inc(&lsp->ls_count);
 	p->ctx = get_nfs_open_context(ctx);
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 0f79d56..ab0b5ab 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -44,6 +44,7 @@
 #include <linux/nfs_idmap.h>
 #include <linux/kthread.h>
 #include <linux/module.h>
+#include <linux/random.h>
 #include <linux/workqueue.h>
 #include <linux/bitops.h>
 
@@ -69,18 +70,14 @@
 	return status;
 }
 
-u32
-nfs4_alloc_lockowner_id(struct nfs_client *clp)
-{
-	return clp->cl_lockowner_id ++;
-}
-
 struct rpc_cred *nfs4_get_renew_cred(struct nfs_client *clp)
 {
 	struct nfs4_state_owner *sp;
+	struct rb_node *pos;
 	struct rpc_cred *cred = NULL;
 
-	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
+	for (pos = rb_first(&clp->cl_state_owners); pos != NULL; pos = rb_next(pos)) {
+		sp = rb_entry(pos, struct nfs4_state_owner, so_client_node);
 		if (list_empty(&sp->so_states))
 			continue;
 		cred = get_rpccred(sp->so_cred);
@@ -92,32 +89,129 @@
 static struct rpc_cred *nfs4_get_setclientid_cred(struct nfs_client *clp)
 {
 	struct nfs4_state_owner *sp;
+	struct rb_node *pos;
 
-	if (!list_empty(&clp->cl_state_owners)) {
-		sp = list_entry(clp->cl_state_owners.next,
-				struct nfs4_state_owner, so_list);
+	pos = rb_first(&clp->cl_state_owners);
+	if (pos != NULL) {
+		sp = rb_entry(pos, struct nfs4_state_owner, so_client_node);
 		return get_rpccred(sp->so_cred);
 	}
 	return NULL;
 }
 
+static void nfs_alloc_unique_id(struct rb_root *root, struct nfs_unique_id *new,
+		__u64 minval, int maxbits)
+{
+	struct rb_node **p, *parent;
+	struct nfs_unique_id *pos;
+	__u64 mask = ~0ULL;
+
+	if (maxbits < 64)
+		mask = (1ULL << maxbits) - 1ULL;
+
+	/* Ensure distribution is more or less flat */
+	get_random_bytes(&new->id, sizeof(new->id));
+	new->id &= mask;
+	if (new->id < minval)
+		new->id += minval;
+retry:
+	p = &root->rb_node;
+	parent = NULL;
+
+	while (*p != NULL) {
+		parent = *p;
+		pos = rb_entry(parent, struct nfs_unique_id, rb_node);
+
+		if (new->id < pos->id)
+			p = &(*p)->rb_left;
+		else if (new->id > pos->id)
+			p = &(*p)->rb_right;
+		else
+			goto id_exists;
+	}
+	rb_link_node(&new->rb_node, parent, p);
+	rb_insert_color(&new->rb_node, root);
+	return;
+id_exists:
+	for (;;) {
+		new->id++;
+		if (new->id < minval || (new->id & mask) != new->id) {
+			new->id = minval;
+			break;
+		}
+		parent = rb_next(parent);
+		if (parent == NULL)
+			break;
+		pos = rb_entry(parent, struct nfs_unique_id, rb_node);
+		if (new->id < pos->id)
+			break;
+	}
+	goto retry;
+}
+
+static void nfs_free_unique_id(struct rb_root *root, struct nfs_unique_id *id)
+{
+	rb_erase(&id->rb_node, root);
+}
+
 static struct nfs4_state_owner *
 nfs4_find_state_owner(struct nfs_client *clp, struct rpc_cred *cred)
 {
+	struct rb_node **p = &clp->cl_state_owners.rb_node,
+		       *parent = NULL;
 	struct nfs4_state_owner *sp, *res = NULL;
 
-	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
-		if (sp->so_cred != cred)
-			continue;
-		atomic_inc(&sp->so_count);
-		/* Move to the head of the list */
-		list_move(&sp->so_list, &clp->cl_state_owners);
-		res = sp;
-		break;
+	while (*p != NULL) {
+		parent = *p;
+		sp = rb_entry(parent, struct nfs4_state_owner, so_client_node);
+
+		if (cred < sp->so_cred)
+			p = &parent->rb_left;
+		else if (cred > sp->so_cred)
+			p = &parent->rb_right;
+		else {
+			atomic_inc(&sp->so_count);
+			res = sp;
+			break;
+		}
 	}
 	return res;
 }
 
+static struct nfs4_state_owner *
+nfs4_insert_state_owner(struct nfs_client *clp, struct nfs4_state_owner *new)
+{
+	struct rb_node **p = &clp->cl_state_owners.rb_node,
+		       *parent = NULL;
+	struct nfs4_state_owner *sp;
+
+	while (*p != NULL) {
+		parent = *p;
+		sp = rb_entry(parent, struct nfs4_state_owner, so_client_node);
+
+		if (new->so_cred < sp->so_cred)
+			p = &parent->rb_left;
+		else if (new->so_cred > sp->so_cred)
+			p = &parent->rb_right;
+		else {
+			atomic_inc(&sp->so_count);
+			return sp;
+		}
+	}
+	nfs_alloc_unique_id(&clp->cl_openowner_id, &new->so_owner_id, 1, 64);
+	rb_link_node(&new->so_client_node, parent, p);
+	rb_insert_color(&new->so_client_node, &clp->cl_state_owners);
+	return new;
+}
+
+static void
+nfs4_remove_state_owner(struct nfs_client *clp, struct nfs4_state_owner *sp)
+{
+	if (!RB_EMPTY_NODE(&sp->so_client_node))
+		rb_erase(&sp->so_client_node, &clp->cl_state_owners);
+	nfs_free_unique_id(&clp->cl_openowner_id, &sp->so_owner_id);
+}
+
 /*
  * nfs4_alloc_state_owner(): this is called on the OPEN or CREATE path to
  * create a new state_owner.
@@ -145,10 +239,14 @@
 void
 nfs4_drop_state_owner(struct nfs4_state_owner *sp)
 {
-	struct nfs_client *clp = sp->so_client;
-	spin_lock(&clp->cl_lock);
-	list_del_init(&sp->so_list);
-	spin_unlock(&clp->cl_lock);
+	if (!RB_EMPTY_NODE(&sp->so_client_node)) {
+		struct nfs_client *clp = sp->so_client;
+
+		spin_lock(&clp->cl_lock);
+		rb_erase(&sp->so_client_node, &clp->cl_state_owners);
+		RB_CLEAR_NODE(&sp->so_client_node);
+		spin_unlock(&clp->cl_lock);
+	}
 }
 
 /*
@@ -160,22 +258,24 @@
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs4_state_owner *sp, *new;
 
-	new = nfs4_alloc_state_owner();
 	spin_lock(&clp->cl_lock);
 	sp = nfs4_find_state_owner(clp, cred);
-	if (sp == NULL && new != NULL) {
-		list_add(&new->so_list, &clp->cl_state_owners);
-		new->so_client = clp;
-		new->so_id = nfs4_alloc_lockowner_id(clp);
-		new->so_cred = get_rpccred(cred);
-		sp = new;
-		new = NULL;
-	}
 	spin_unlock(&clp->cl_lock);
-	kfree(new);
 	if (sp != NULL)
 		return sp;
-	return NULL;
+	new = nfs4_alloc_state_owner();
+	if (new == NULL)
+		return NULL;
+	new->so_client = clp;
+	new->so_cred = cred;
+	spin_lock(&clp->cl_lock);
+	sp = nfs4_insert_state_owner(clp, new);
+	spin_unlock(&clp->cl_lock);
+	if (sp == new)
+		get_rpccred(cred);
+	else
+		kfree(new);
+	return sp;
 }
 
 /*
@@ -189,7 +289,7 @@
 
 	if (!atomic_dec_and_lock(&sp->so_count, &clp->cl_lock))
 		return;
-	list_del(&sp->so_list);
+	nfs4_remove_state_owner(clp, sp);
 	spin_unlock(&clp->cl_lock);
 	put_rpccred(cred);
 	kfree(sp);
@@ -386,12 +486,22 @@
 	atomic_set(&lsp->ls_count, 1);
 	lsp->ls_owner = fl_owner;
 	spin_lock(&clp->cl_lock);
-	lsp->ls_id = nfs4_alloc_lockowner_id(clp);
+	nfs_alloc_unique_id(&clp->cl_lockowner_id, &lsp->ls_id, 1, 64);
 	spin_unlock(&clp->cl_lock);
 	INIT_LIST_HEAD(&lsp->ls_locks);
 	return lsp;
 }
 
+static void nfs4_free_lock_state(struct nfs4_lock_state *lsp)
+{
+	struct nfs_client *clp = lsp->ls_state->owner->so_client;
+
+	spin_lock(&clp->cl_lock);
+	nfs_free_unique_id(&clp->cl_lockowner_id, &lsp->ls_id);
+	spin_unlock(&clp->cl_lock);
+	kfree(lsp);
+}
+
 /*
  * Return a compatible lock_state. If no initialized lock_state structure
  * exists, return an uninitialized one.
@@ -421,7 +531,8 @@
 			return NULL;
 	}
 	spin_unlock(&state->state_lock);
-	kfree(new);
+	if (new != NULL)
+		nfs4_free_lock_state(new);
 	return lsp;
 }
 
@@ -442,7 +553,7 @@
 	if (list_empty(&state->lock_states))
 		clear_bit(LK_STATE_IN_USE, &state->flags);
 	spin_unlock(&state->state_lock);
-	kfree(lsp);
+	nfs4_free_lock_state(lsp);
 }
 
 static void nfs4_fl_copy_lock(struct file_lock *dst, struct file_lock *src)
@@ -719,11 +830,13 @@
 static void nfs4_state_mark_reclaim(struct nfs_client *clp)
 {
 	struct nfs4_state_owner *sp;
+	struct rb_node *pos;
 	struct nfs4_state *state;
 	struct nfs4_lock_state *lock;
 
 	/* Reset all sequence ids to zero */
-	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
+	for (pos = rb_first(&clp->cl_state_owners); pos != NULL; pos = rb_next(pos)) {
+		sp = rb_entry(pos, struct nfs4_state_owner, so_client_node);
 		sp->so_seqid.counter = 0;
 		sp->so_seqid.flags = 0;
 		spin_lock(&sp->so_lock);
@@ -742,6 +855,7 @@
 {
 	struct nfs_client *clp = ptr;
 	struct nfs4_state_owner *sp;
+	struct rb_node *pos;
 	struct nfs4_state_recovery_ops *ops;
 	struct rpc_cred *cred;
 	int status = 0;
@@ -787,7 +901,8 @@
 	/* Mark all delegations for reclaim */
 	nfs_delegation_mark_reclaim(clp);
 	/* Note: list is protected by exclusive lock on cl->cl_sem */
-	list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
+	for (pos = rb_first(&clp->cl_state_owners); pos != NULL; pos = rb_next(pos)) {
+		sp = rb_entry(pos, struct nfs4_state_owner, so_client_node);
 		status = nfs4_reclaim_open_state(ops, sp);
 		if (status < 0) {
 			if (status == -NFS4ERR_NO_GRACE) {
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 4c8f67d..c087384 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -68,10 +68,10 @@
 #endif
 
 /* lock,open owner id: 
- * we currently use size 1 (u32) out of (NFS4_OPAQUE_LIMIT  >> 2)
+ * we currently use size 2 (u64) out of (NFS4_OPAQUE_LIMIT  >> 2)
  */
-#define open_owner_id_maxsz	(1 + 1)
-#define lock_owner_id_maxsz	(1 + 1)
+#define open_owner_id_maxsz	(1 + 4)
+#define lock_owner_id_maxsz	(1 + 4)
 #define compound_encode_hdr_maxsz	(3 + (NFS4_MAXTAGLEN >> 2))
 #define compound_decode_hdr_maxsz	(3 + (NFS4_MAXTAGLEN >> 2))
 #define op_encode_hdr_maxsz	(1)
@@ -827,13 +827,14 @@
 	WRITE64(nfs4_lock_length(args->fl));
 	WRITE32(args->new_lock_owner);
 	if (args->new_lock_owner){
-		RESERVE_SPACE(4+NFS4_STATEID_SIZE+20);
+		RESERVE_SPACE(4+NFS4_STATEID_SIZE+32);
 		WRITE32(args->open_seqid->sequence->counter);
 		WRITEMEM(args->open_stateid->data, NFS4_STATEID_SIZE);
 		WRITE32(args->lock_seqid->sequence->counter);
 		WRITE64(args->lock_owner.clientid);
-		WRITE32(4);
-		WRITE32(args->lock_owner.id);
+		WRITE32(16);
+		WRITEMEM("lock id:", 8);
+		WRITE64(args->lock_owner.id);
 	}
 	else {
 		RESERVE_SPACE(NFS4_STATEID_SIZE+4);
@@ -848,14 +849,15 @@
 {
 	__be32 *p;
 
-	RESERVE_SPACE(40);
+	RESERVE_SPACE(52);
 	WRITE32(OP_LOCKT);
 	WRITE32(nfs4_lock_type(args->fl, 0));
 	WRITE64(args->fl->fl_start);
 	WRITE64(nfs4_lock_length(args->fl));
 	WRITE64(args->lock_owner.clientid);
-	WRITE32(4);
-	WRITE32(args->lock_owner.id);
+	WRITE32(16);
+	WRITEMEM("lock id:", 8);
+	WRITE64(args->lock_owner.id);
 
 	return 0;
 }
@@ -920,10 +922,11 @@
 	WRITE32(OP_OPEN);
 	WRITE32(arg->seqid->sequence->counter);
 	encode_share_access(xdr, arg->open_flags);
-	RESERVE_SPACE(16);
+	RESERVE_SPACE(28);
 	WRITE64(arg->clientid);
-	WRITE32(4);
-	WRITE32(arg->id);
+	WRITE32(16);
+	WRITEMEM("open id:", 8);
+	WRITE64(arg->id);
 }
 
 static inline void encode_createmode(struct xdr_stream *xdr, const struct nfs_openargs *arg)