dlm: use rsbtbl as resource directory

Remove the dir hash table (dirtbl), and use
the rsb hash table (rsbtbl) as the resource
directory.  It has always been an unnecessary
duplication of information.

This improves efficiency by using a single rsbtbl
lookup in many cases where both rsbtbl and dirtbl
lookups were needed previously.

This eliminates the need to handle cases of rsbtbl
and dirtbl being out of sync.

In many cases there will be memory savings because
the dir hash table no longer exists.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4d..3c025fe 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -361,9 +361,8 @@
  * rsb's to consider.
  */
 
-static void set_new_master(struct dlm_rsb *r, int nodeid)
+static void set_new_master(struct dlm_rsb *r)
 {
-	r->res_nodeid = nodeid;
 	set_master_lkbs(r);
 	rsb_set_flag(r, RSB_NEW_MASTER);
 	rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +371,48 @@
 /*
  * We do async lookups on rsb's that need new masters.  The rsb's
  * waiting for a lookup reply are kept on the recover_list.
+ *
+ * Another node recovering the master may have sent us a rcom lookup,
+ * and our dlm_master_lookup() set it as the new master, along with
+ * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
+ * equals our_nodeid below).
  */
 
-static int recover_master(struct dlm_rsb *r)
+static int recover_master(struct dlm_rsb *r, unsigned int *count)
 {
 	struct dlm_ls *ls = r->res_ls;
-	int error, ret_nodeid;
-	int our_nodeid = dlm_our_nodeid();
-	int dir_nodeid = dlm_dir_nodeid(r);
+	int our_nodeid, dir_nodeid;
+	int is_removed = 0;
+	int error;
+
+	if (is_master(r))
+		return 0;
+
+	is_removed = dlm_is_removed(ls, r->res_nodeid);
+
+	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
+		return 0;
+
+	our_nodeid = dlm_our_nodeid();
+	dir_nodeid = dlm_dir_nodeid(r);
 
 	if (dir_nodeid == our_nodeid) {
-		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
-				       r->res_length, &ret_nodeid);
-		if (error)
-			log_error(ls, "recover dir lookup error %d", error);
+		if (is_removed) {
+			r->res_master_nodeid = our_nodeid;
+			r->res_nodeid = 0;
+		}
 
-		if (ret_nodeid == our_nodeid)
-			ret_nodeid = 0;
-		lock_rsb(r);
-		set_new_master(r, ret_nodeid);
-		unlock_rsb(r);
+		/* set master of lkbs to ourself when is_removed, or to
+		   another new master which we set along with NEW_MASTER
+		   in dlm_master_lookup */
+		set_new_master(r);
+		error = 0;
 	} else {
 		recover_list_add(r);
 		error = dlm_send_rcom_lookup(r, dir_nodeid);
 	}
 
+	(*count)++;
 	return error;
 }
 
@@ -415,7 +431,7 @@
  * resent.
  */
 
-static int recover_master_static(struct dlm_rsb *r)
+static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
 {
 	int dir_nodeid = dlm_dir_nodeid(r);
 	int new_master = dir_nodeid;
@@ -423,11 +439,12 @@
 	if (dir_nodeid == dlm_our_nodeid())
 		new_master = 0;
 
-	lock_rsb(r);
 	dlm_purge_mstcpy_locks(r);
-	set_new_master(r, new_master);
-	unlock_rsb(r);
-	return 1;
+	r->res_master_nodeid = dir_nodeid;
+	r->res_nodeid = new_master;
+	set_new_master(r);
+	(*count)++;
+	return 0;
 }
 
 /*
@@ -443,7 +460,10 @@
 int dlm_recover_masters(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int error = 0, count = 0;
+	unsigned int total = 0;
+	unsigned int count = 0;
+	int nodir = dlm_no_directory(ls);
+	int error;
 
 	log_debug(ls, "dlm_recover_masters");
 
@@ -455,20 +475,23 @@
 			goto out;
 		}
 
-		if (dlm_no_directory(ls))
-			count += recover_master_static(r);
-		else if (!is_master(r) &&
-			 (dlm_is_removed(ls, r->res_nodeid) ||
-			  rsb_flag(r, RSB_NEW_MASTER))) {
-			recover_master(r);
-			count++;
-		}
+		lock_rsb(r);
+		if (nodir)
+			error = recover_master_static(r, &count);
+		else
+			error = recover_master(r, &count);
+		unlock_rsb(r);
+		cond_resched();
+		total++;
 
-		schedule();
+		if (error) {
+			up_read(&ls->ls_root_sem);
+			goto out;
+		}
 	}
 	up_read(&ls->ls_root_sem);
 
-	log_debug(ls, "dlm_recover_masters %d resources", count);
+	log_debug(ls, "dlm_recover_masters %u of %u", count, total);
 
 	error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -480,7 +503,7 @@
 int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
 {
 	struct dlm_rsb *r;
-	int nodeid;
+	int ret_nodeid, new_master;
 
 	r = recover_list_find(ls, rc->rc_id);
 	if (!r) {
@@ -489,12 +512,17 @@
 		goto out;
 	}
 
-	nodeid = rc->rc_result;
-	if (nodeid == dlm_our_nodeid())
-		nodeid = 0;
+	ret_nodeid = rc->rc_result;
+
+	if (ret_nodeid == dlm_our_nodeid())
+		new_master = 0;
+	else
+		new_master = ret_nodeid;
 
 	lock_rsb(r);
-	set_new_master(r, nodeid);
+	r->res_master_nodeid = ret_nodeid;
+	r->res_nodeid = new_master;
+	set_new_master(r);
 	unlock_rsb(r);
 	recover_list_del(r);
 
@@ -791,20 +819,8 @@
 			dlm_hold_rsb(r);
 		}
 
-		/* If we're using a directory, add tossed rsbs to the root
-		   list; they'll have entries created in the new directory,
-		   but no other recovery steps should do anything with them. */
-
-		if (dlm_no_directory(ls)) {
-			spin_unlock(&ls->ls_rsbtbl[i].lock);
-			continue;
-		}
-
-		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
-			r = rb_entry(n, struct dlm_rsb, res_hashnode);
-			list_add(&r->res_root_list, &ls->ls_root_list);
-			dlm_hold_rsb(r);
-		}
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+			log_error(ls, "dlm_create_root_list toss not empty");
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
  out:
@@ -824,28 +840,26 @@
 	up_write(&ls->ls_root_sem);
 }
 
-/* If not using a directory, clear the entire toss list, there's no benefit to
-   caching the master value since it's fixed.  If we are using a dir, keep the
-   rsb's we're the master of.  Recovery will add them to the root list and from
-   there they'll be entered in the rebuilt directory. */
-
-void dlm_clear_toss_list(struct dlm_ls *ls)
+void dlm_clear_toss(struct dlm_ls *ls)
 {
 	struct rb_node *n, *next;
-	struct dlm_rsb *rsb;
+	struct dlm_rsb *r;
+	unsigned int count = 0;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
 		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
-			next = rb_next(n);;
-			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-			if (dlm_no_directory(ls) || !is_master(rsb)) {
-				rb_erase(n, &ls->ls_rsbtbl[i].toss);
-				dlm_free_rsb(rsb);
-			}
+			next = rb_next(n);
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
+			dlm_free_rsb(r);
+			count++;
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
 	}
+
+	if (count)
+		log_debug(ls, "dlm_clear_toss %u done", count);
 }