dlm: add node slots and generation

Slot numbers are assigned to nodes when they join the lockspace.
The slot number chosen is the minimum unused value starting at 1.
Once a node is assigned a slot, that slot number will not change
while the node remains a lockspace member.  If the node leaves
and rejoins it can be assigned a new slot number.

A new generation number is also added to a lockspace.  It is
set and incremented during each recovery along with the slot
collection/assignment.

The slot numbers will be passed to gfs2 which will use them as
journal id's.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 81b2393..34d5adf1f 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -85,14 +85,20 @@
 	return status;
 }
 
+static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
+{
+	ls->ls_recover_status |= status;
+}
+
 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 {
 	spin_lock(&ls->ls_recover_lock);
-	ls->ls_recover_status |= status;
+	_set_recover_status(ls, status);
 	spin_unlock(&ls->ls_recover_lock);
 }
 
-static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
+			   int save_slots)
 {
 	struct dlm_rcom *rc = ls->ls_recover_buf;
 	struct dlm_member *memb;
@@ -106,10 +112,13 @@
 				goto out;
 			}
 
-			error = dlm_rcom_status(ls, memb->nodeid);
+			error = dlm_rcom_status(ls, memb->nodeid, 0);
 			if (error)
 				goto out;
 
+			if (save_slots)
+				dlm_slot_save(ls, rc, memb);
+
 			if (rc->rc_result & wait_status)
 				break;
 			if (delay < 1000)
@@ -121,7 +130,8 @@
 	return error;
 }
 
-static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
+			   uint32_t status_flags)
 {
 	struct dlm_rcom *rc = ls->ls_recover_buf;
 	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -132,7 +142,7 @@
 			goto out;
 		}
 
-		error = dlm_rcom_status(ls, nodeid);
+		error = dlm_rcom_status(ls, nodeid, status_flags);
 		if (error)
 			break;
 
@@ -152,18 +162,56 @@
 	int error;
 
 	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
-		error = wait_status_all(ls, status);
+		error = wait_status_all(ls, status, 0);
 		if (!error)
 			dlm_set_recover_status(ls, status_all);
 	} else
-		error = wait_status_low(ls, status_all);
+		error = wait_status_low(ls, status_all, 0);
 
 	return error;
 }
 
 int dlm_recover_members_wait(struct dlm_ls *ls)
 {
-	return wait_status(ls, DLM_RS_NODES);
+	struct dlm_member *memb;
+	struct dlm_slot *slots;
+	int num_slots, slots_size;
+	int error, rv;
+	uint32_t gen;
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		memb->slot = -1;
+		memb->generation = 0;
+	}
+
+	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+		error = wait_status_all(ls, DLM_RS_NODES, 1);
+		if (error)
+			goto out;
+
+		/* slots array is sparse, slots_size may be > num_slots */
+
+		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
+		if (!rv) {
+			spin_lock(&ls->ls_recover_lock);
+			_set_recover_status(ls, DLM_RS_NODES_ALL);
+			ls->ls_num_slots = num_slots;
+			ls->ls_slots_size = slots_size;
+			ls->ls_slots = slots;
+			ls->ls_generation = gen;
+			spin_unlock(&ls->ls_recover_lock);
+		} else {
+			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
+		}
+	} else {
+		error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
+		if (error)
+			goto out;
+
+		dlm_slots_copy_in(ls);
+	}
+ out:
+	return error;
 }
 
 int dlm_recover_directory_wait(struct dlm_ls *ls)