dlm: fixes for nodir mode

The "nodir" mode (statically assign master nodes instead
of using the resource directory) has always been highly
experimental, and never seriously used.  This commit
fixes a number of problems, making nodir much more usable.

- Major change to recovery: recover all locks and restart
  all in-progress operations after recovery.  In some
  cases it's not possible to know which in-progess locks
  to recover, so recover all.  (Most require recovery
  in nodir mode anyway since rehashing changes most
  master nodes.)

- Change the way nodir mode is enabled, from a command
  line mount arg passed through gfs2, into a sysfs
  file managed by dlm_controld, consistent with the
  other config settings.

- Allow recovering MSTCPY locks on an rsb that has not
  yet been turned into a master copy.

- Ignore RCOM_LOCK and RCOM_LOCK_REPLY recovery messages
  from a previous, aborted recovery cycle.  Base this
  on the local recovery status not being in the state
  where any nodes should be sending LOCK messages for the
  current recovery cycle.

- Hold rsb lock around dlm_purge_mstcpy_locks() because it
  may run concurrently with dlm_recover_master_copy().

- Maintain highbast on process-copy lkb's (in addition to
  the master as is usual), because the lkb can switch
  back and forth between being a master and being a
  process copy as the master node changes in recovery.

- When recovering MSTCPY locks, flag rsb's that have
  non-empty convert or waiting queues for granting
  at the end of recovery.  (Rename flag from LOCKS_PURGED
  to RECOVER_GRANT and similar for the recovery function,
  because it's not only resources with purged locks
  that need grant a grant attempt.)

- Replace a couple of unnecessary assertion panics with
  error messages.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 34d5adf1f..7554e4d 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -339,9 +339,12 @@
 {
 	struct dlm_lkb *lkb;
 
-	list_for_each_entry(lkb, queue, lkb_statequeue)
-		if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+	list_for_each_entry(lkb, queue, lkb_statequeue) {
+		if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
 			lkb->lkb_nodeid = nodeid;
+			lkb->lkb_remid = 0;
+		}
+	}
 }
 
 static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@
 /*
  * Propagate the new master nodeid to locks
  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
  * rsb's to consider.
  */
 
 static void set_new_master(struct dlm_rsb *r, int nodeid)
 {
-	lock_rsb(r);
 	r->res_nodeid = nodeid;
 	set_master_lkbs(r);
 	rsb_set_flag(r, RSB_NEW_MASTER);
 	rsb_set_flag(r, RSB_NEW_MASTER2);
-	unlock_rsb(r);
 }
 
 /*
@@ -376,9 +377,9 @@
 static int recover_master(struct dlm_rsb *r)
 {
 	struct dlm_ls *ls = r->res_ls;
-	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
-
-	dir_nodeid = dlm_dir_nodeid(r);
+	int error, ret_nodeid;
+	int our_nodeid = dlm_our_nodeid();
+	int dir_nodeid = dlm_dir_nodeid(r);
 
 	if (dir_nodeid == our_nodeid) {
 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@
 
 		if (ret_nodeid == our_nodeid)
 			ret_nodeid = 0;
+		lock_rsb(r);
 		set_new_master(r, ret_nodeid);
+		unlock_rsb(r);
 	} else {
 		recover_list_add(r);
 		error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@
 }
 
 /*
- * When not using a directory, most resource names will hash to a new static
- * master nodeid and the resource will need to be remastered.
+ * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
+ * This is necessary because recovery can be started, aborted and restarted,
+ * causing the master nodeid to briefly change during the aborted recovery, and
+ * change back to the original value in the second recovery.  The MSTCPY locks
+ * may or may not have been purged during the aborted recovery.  Another node
+ * with an outstanding request in waiters list and a request reply saved in the
+ * requestqueue, cannot know whether it should ignore the reply and resend the
+ * request, or accept the reply and complete the request.  It must do the
+ * former if the remote node purged MSTCPY locks, and it must do the later if
+ * the remote node did not.  This is solved by always purging MSTCPY locks, in
+ * which case, the request reply would always be ignored and the request
+ * resent.
  */
 
 static int recover_master_static(struct dlm_rsb *r)
 {
-	int master = dlm_dir_nodeid(r);
+	int dir_nodeid = dlm_dir_nodeid(r);
+	int new_master = dir_nodeid;
 
-	if (master == dlm_our_nodeid())
-		master = 0;
+	if (dir_nodeid == dlm_our_nodeid())
+		new_master = 0;
 
-	if (r->res_nodeid != master) {
-		if (is_master(r))
-			dlm_purge_mstcpy_locks(r);
-		set_new_master(r, master);
-		return 1;
-	}
-	return 0;
+	lock_rsb(r);
+	dlm_purge_mstcpy_locks(r);
+	set_new_master(r, new_master);
+	unlock_rsb(r);
+	return 1;
 }
 
 /*
@@ -481,7 +493,9 @@
 	if (nodeid == dlm_our_nodeid())
 		nodeid = 0;
 
+	lock_rsb(r);
 	set_new_master(r, nodeid);
+	unlock_rsb(r);
 	recover_list_del(r);
 
 	if (recover_list_empty(ls))
@@ -556,8 +570,6 @@
 	struct dlm_rsb *r;
 	int error, count = 0;
 
-	log_debug(ls, "dlm_recover_locks");
-
 	down_read(&ls->ls_root_sem);
 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 		if (is_master(r)) {
@@ -584,7 +596,7 @@
 	}
 	up_read(&ls->ls_root_sem);
 
-	log_debug(ls, "dlm_recover_locks %d locks", count);
+	log_debug(ls, "dlm_recover_locks %d out", count);
 
 	error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -721,21 +733,19 @@
 }
 
 /* We've become the new master for this rsb and waiting/converting locks may
-   need to be granted in dlm_grant_after_purge() due to locks that may have
+   need to be granted in dlm_recover_grant() due to locks that may have
    existed from a removed node. */
 
-static void set_locks_purged(struct dlm_rsb *r)
+static void recover_grant(struct dlm_rsb *r)
 {
 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
-		rsb_set_flag(r, RSB_LOCKS_PURGED);
+		rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
 void dlm_recover_rsbs(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
-	int count = 0;
-
-	log_debug(ls, "dlm_recover_rsbs");
+	unsigned int count = 0;
 
 	down_read(&ls->ls_root_sem);
 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@
 			if (rsb_flag(r, RSB_RECOVER_CONVERT))
 				recover_conversion(r);
 			if (rsb_flag(r, RSB_NEW_MASTER2))
-				set_locks_purged(r);
+				recover_grant(r);
 			recover_lvb(r);
 			count++;
 		}
@@ -754,7 +764,8 @@
 	}
 	up_read(&ls->ls_root_sem);
 
-	log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+	if (count)
+		log_debug(ls, "dlm_recover_rsbs %d done", count);
 }
 
 /* Create a single list of all root rsb's to be used during recovery */