dlm: fixes for nodir mode

The "nodir" mode (statically assign master nodes instead
of using the resource directory) has always been highly
experimental, and never seriously used.  This commit
fixes a number of problems, making nodir much more usable.

- Major change to recovery: recover all locks and restart
  all in-progress operations after recovery.  In some
  cases it's not possible to know which in-progess locks
  to recover, so recover all.  (Most require recovery
  in nodir mode anyway since rehashing changes most
  master nodes.)

- Change the way nodir mode is enabled, from a command
  line mount arg passed through gfs2, into a sysfs
  file managed by dlm_controld, consistent with the
  other config settings.

- Allow recovering MSTCPY locks on an rsb that has not
  yet been turned into a master copy.

- Ignore RCOM_LOCK and RCOM_LOCK_REPLY recovery messages
  from a previous, aborted recovery cycle.  Base this
  on the local recovery status not being in the state
  where any nodes should be sending LOCK messages for the
  current recovery cycle.

- Hold rsb lock around dlm_purge_mstcpy_locks() because it
  may run concurrently with dlm_recover_master_copy().

- Maintain highbast on process-copy lkb's (in addition to
  the master as is usual), because the lkb can switch
  back and forth between being a master and being a
  process copy as the master node changes in recovery.

- When recovering MSTCPY locks, flag rsb's that have
  non-empty convert or waiting queues for granting
  at the end of recovery.  (Rename flag from LOCKS_PURGED
  to RECOVER_GRANT and similar for the recovery function,
  because it's not only resources with purged locks
  that need grant a grant attempt.)

- Replace a couple of unnecessary assertion panics with
  error messages.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f3ba703..bdafb65 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -161,10 +161,11 @@
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
-	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d\n",
+	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
-	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+	       (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@
 
 static inline int is_master_copy(struct dlm_lkb *lkb)
 {
-	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
-		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 }
 
@@ -1519,13 +1518,13 @@
 	}
 
 	lkb->lkb_rqmode = DLM_LOCK_IV;
+	lkb->lkb_highbast = 0;
 }
 
 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
 	set_lvb_lock(r, lkb);
 	_grant_lock(r, lkb);
-	lkb->lkb_highbast = 0;
 }
 
 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1887,7 +1886,8 @@
 /* Returns the highest requested mode of all blocked conversions; sets
    cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+				 unsigned int *count)
 {
 	struct dlm_lkb *lkb, *s;
 	int hi, demoted, quit, grant_restart, demote_restart;
@@ -1906,6 +1906,8 @@
 		if (can_be_granted(r, lkb, 0, &deadlk)) {
 			grant_lock_pending(r, lkb);
 			grant_restart = 1;
+			if (count)
+				(*count)++;
 			continue;
 		}
 
@@ -1939,14 +1941,17 @@
 	return max_t(int, high, hi);
 }
 
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+			      unsigned int *count)
 {
 	struct dlm_lkb *lkb, *s;
 
 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-		if (can_be_granted(r, lkb, 0, NULL))
+		if (can_be_granted(r, lkb, 0, NULL)) {
 			grant_lock_pending(r, lkb);
-                else {
+			if (count)
+				(*count)++;
+		} else {
 			high = max_t(int, lkb->lkb_rqmode, high);
 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
 				*cw = 1;
@@ -1975,16 +1980,20 @@
 	return 0;
 }
 
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 {
 	struct dlm_lkb *lkb, *s;
 	int high = DLM_LOCK_IV;
 	int cw = 0;
 
-	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+	if (!is_master(r)) {
+		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+		dlm_dump_rsb(r);
+		return;
+	}
 
-	high = grant_pending_convert(r, high, &cw);
-	high = grant_pending_wait(r, high, &cw);
+	high = grant_pending_convert(r, high, &cw, count);
+	high = grant_pending_wait(r, high, &cw, count);
 
 	if (high == DLM_LOCK_IV)
 		return;
@@ -2520,7 +2529,7 @@
 	   before we try again to grant this one. */
 
 	if (is_demoted(lkb)) {
-		grant_pending_convert(r, DLM_LOCK_IV, NULL);
+		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
 		if (_can_be_granted(r, lkb, 1)) {
 			grant_lock(r, lkb);
 			queue_cast(r, lkb, 0);
@@ -2548,7 +2557,7 @@
 {
 	switch (error) {
 	case 0:
-		grant_pending_locks(r);
+		grant_pending_locks(r, NULL);
 		/* grant_pending_locks also sends basts */
 		break;
 	case -EAGAIN:
@@ -2571,7 +2580,7 @@
 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
 			      int error)
 {
-	grant_pending_locks(r);
+	grant_pending_locks(r, NULL);
 }
 
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2592,7 +2601,7 @@
 			      int error)
 {
 	if (error)
-		grant_pending_locks(r);
+		grant_pending_locks(r, NULL);
 }
 
 /*
@@ -3452,8 +3461,9 @@
 		goto fail;
 
 	if (lkb->lkb_remid != ms->m_lkid) {
-		log_error(ls, "receive_convert %x remid %x remote %d %x",
-			  lkb->lkb_id, lkb->lkb_remid,
+		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+			  (unsigned long long)lkb->lkb_recover_seq,
 			  ms->m_header.h_nodeid, ms->m_lkid);
 		error = -ENOENT;
 		goto fail;
@@ -3631,6 +3641,7 @@
 		goto out;
 
 	queue_bast(r, lkb, ms->m_bastmode);
+	lkb->lkb_highbast = ms->m_bastmode;
  out:
 	unlock_rsb(r);
 	put_rsb(r);
@@ -3710,8 +3721,13 @@
 
 	mstype = lkb->lkb_wait_type;
 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
-	if (error)
+	if (error) {
+		log_error(ls, "receive_request_reply %x remote %d %x result %d",
+			  lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+			  ms->m_result);
+		dlm_dump_rsb(r);
 		goto out;
+	}
 
 	/* Optimization: the dir node was also the master, so it took our
 	   lookup as a request and sent request reply instead of lookup reply */
@@ -4122,21 +4138,28 @@
 	 * happen in normal usage for the async messages and cancel, so
 	 * only use log_debug for them.
 	 *
-	 * Other errors are expected and normal.
+	 * Some errors are expected and normal.
 	 */
 
 	if (error == -ENOENT && noent) {
-		log_debug(ls, "receive %d no %x remote %d %x seq %u",
+		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
 			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
 			  ms->m_lkid, saved_seq);
 	} else if (error == -ENOENT) {
-		log_error(ls, "receive %d no %x remote %d %x seq %u",
+		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
 			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
 			  ms->m_lkid, saved_seq);
 
 		if (ms->m_type == DLM_MSG_CONVERT)
 			dlm_dump_rsb_hash(ls, ms->m_hash);
 	}
+
+	if (error == -EINVAL) {
+		log_error(ls, "receive %d inval from %d lkid %x remid %x "
+			  "saved_seq %u",
+			  ms->m_type, ms->m_header.h_nodeid,
+			  ms->m_lkid, ms->m_remid, saved_seq);
+	}
 }
 
 /* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4200,9 +4223,11 @@
 
 	ls = dlm_find_lockspace_global(hd->h_lockspace);
 	if (!ls) {
-		if (dlm_config.ci_log_debug)
-			log_print("invalid lockspace %x from %d cmd %d type %d",
-				  hd->h_lockspace, nodeid, hd->h_cmd, type);
+		if (dlm_config.ci_log_debug) {
+			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+				"%u from %d cmd %d type %d\n",
+				hd->h_lockspace, nodeid, hd->h_cmd, type);
+		}
 
 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
 			dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4253,18 +4278,12 @@
 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
 				 int dir_nodeid)
 {
+	if (dlm_no_directory(ls))
+		return 1;
+
 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
 		return 1;
 
-	if (!dlm_no_directory(ls))
-		return 0;
-
-	if (dir_nodeid == dlm_our_nodeid())
-		return 1;
-
-	if (dir_nodeid != lkb->lkb_wait_nodeid)
-		return 1;
-
 	return 0;
 }
 
@@ -4519,112 +4538,177 @@
 	return error;
 }
 
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
-			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+			      struct list_head *list)
 {
-	struct dlm_ls *ls = r->res_ls;
 	struct dlm_lkb *lkb, *safe;
 
-	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
-		if (test(ls, lkb)) {
-			rsb_set_flag(r, RSB_LOCKS_PURGED);
-			del_lkb(r, lkb);
-			/* this put should free the lkb */
-			if (!dlm_put_lkb(lkb))
-				log_error(ls, "purged lkb not released");
-		}
+	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+		if (!is_master_copy(lkb))
+			continue;
+
+		/* don't purge lkbs we've added in recover_master_copy for
+		   the current recovery seq */
+
+		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+			continue;
+
+		del_lkb(r, lkb);
+
+		/* this put should free the lkb */
+		if (!dlm_put_lkb(lkb))
+			log_error(ls, "purged mstcpy lkb not released");
 	}
 }
 
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
-
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-	return is_master_copy(lkb);
-}
-
-static void purge_dead_locks(struct dlm_rsb *r)
-{
-	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
-	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
-	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
-
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
 {
-	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
-	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
-	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+	struct dlm_ls *ls = r->res_ls;
+
+	purge_mstcpy_list(ls, r, &r->res_grantqueue);
+	purge_mstcpy_list(ls, r, &r->res_convertqueue);
+	purge_mstcpy_list(ls, r, &r->res_waitqueue);
+}
+
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+			    struct list_head *list,
+			    int nodeid_gone, unsigned int *count)
+{
+	struct dlm_lkb *lkb, *safe;
+
+	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+		if (!is_master_copy(lkb))
+			continue;
+
+		if ((lkb->lkb_nodeid == nodeid_gone) ||
+		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+			del_lkb(r, lkb);
+
+			/* this put should free the lkb */
+			if (!dlm_put_lkb(lkb))
+				log_error(ls, "purged dead lkb not released");
+
+			rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+			(*count)++;
+		}
+	}
 }
 
 /* Get rid of locks held by nodes that are gone. */
 
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
+	struct dlm_member *memb;
+	int nodes_count = 0;
+	int nodeid_gone = 0;
+	unsigned int lkb_count = 0;
 
-	log_debug(ls, "dlm_purge_locks");
+	/* cache one removed nodeid to optimize the common
+	   case of a single node removed */
+
+	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+		nodes_count++;
+		nodeid_gone = memb->nodeid;
+	}
+
+	if (!nodes_count)
+		return;
 
 	down_write(&ls->ls_root_sem);
 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 		hold_rsb(r);
 		lock_rsb(r);
-		if (is_master(r))
-			purge_dead_locks(r);
+		if (is_master(r)) {
+			purge_dead_list(ls, r, &r->res_grantqueue,
+					nodeid_gone, &lkb_count);
+			purge_dead_list(ls, r, &r->res_convertqueue,
+					nodeid_gone, &lkb_count);
+			purge_dead_list(ls, r, &r->res_waitqueue,
+					nodeid_gone, &lkb_count);
+		}
 		unlock_rsb(r);
 		unhold_rsb(r);
-
-		schedule();
+		cond_resched();
 	}
 	up_write(&ls->ls_root_sem);
 
-	return 0;
+	if (lkb_count)
+		log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+			  lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 {
 	struct rb_node *n;
-	struct dlm_rsb *r, *r_ret = NULL;
+	struct dlm_rsb *r;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
-		if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+		if (!rsb_flag(r, RSB_RECOVER_GRANT))
+			continue;
+		rsb_clear_flag(r, RSB_RECOVER_GRANT);
+		if (!is_master(r))
 			continue;
 		hold_rsb(r);
-		rsb_clear_flag(r, RSB_LOCKS_PURGED);
-		r_ret = r;
-		break;
+		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+		return r;
 	}
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-	return r_ret;
+	return NULL;
 }
 
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted.  The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock.  This would make iterating through all
+ * rsb's very inefficient.  So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
 {
 	struct dlm_rsb *r;
 	int bucket = 0;
+	unsigned int count = 0;
+	unsigned int rsb_count = 0;
+	unsigned int lkb_count = 0;
 
 	while (1) {
-		r = find_purged_rsb(ls, bucket);
+		r = find_grant_rsb(ls, bucket);
 		if (!r) {
 			if (bucket == ls->ls_rsbtbl_size - 1)
 				break;
 			bucket++;
 			continue;
 		}
+		rsb_count++;
+		count = 0;
 		lock_rsb(r);
-		if (is_master(r)) {
-			grant_pending_locks(r);
-			confirm_master(r, 0);
-		}
+		grant_pending_locks(r, &count);
+		lkb_count += count;
+		confirm_master(r, 0);
 		unlock_rsb(r);
 		put_rsb(r);
-		schedule();
+		cond_resched();
 	}
+
+	if (lkb_count)
+		log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+			  lkb_count, rsb_count);
 }
 
 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4723,11 +4807,26 @@
 
 	remid = le32_to_cpu(rl->rl_lkid);
 
-	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
-			 R_MASTER, &r);
+	/* In general we expect the rsb returned to be R_MASTER, but we don't
+	   have to require it.  Recovery of masters on one node can overlap
+	   recovery of locks on another node, so one node can send us MSTCPY
+	   locks before we've made ourselves master of this rsb.  We can still
+	   add new MSTCPY locks that we receive here without any harm; when
+	   we make ourselves master, dlm_recover_masters() won't touch the
+	   MSTCPY locks we've received early. */
+
+	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
 	if (error)
 		goto out;
 
+	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+			  rc->rc_header.h_nodeid, remid);
+		error = -EBADR;
+		put_rsb(r);
+		goto out;
+	}
+
 	lock_rsb(r);
 
 	lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
@@ -4749,12 +4848,18 @@
 	attach_lkb(r, lkb);
 	add_lkb(r, lkb, rl->rl_status);
 	error = 0;
+	ls->ls_recover_locks_in++;
+
+	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+		rsb_set_flag(r, RSB_RECOVER_GRANT);
 
  out_remid:
 	/* this is the new value returned to the lock holder for
 	   saving in its process-copy lkb */
 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
+	lkb->lkb_recover_seq = ls->ls_recover_seq;
+
  out_unlock:
 	unlock_rsb(r);
 	put_rsb(r);
@@ -4786,17 +4891,20 @@
 		return error;
 	}
 
-	if (!is_process_copy(lkb)) {
-		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
-			  lkid, rc->rc_header.h_nodeid, remid, result);
-		dlm_print_lkb(lkb);
-		return -EINVAL;
-	}
-
 	r = lkb->lkb_resource;
 	hold_rsb(r);
 	lock_rsb(r);
 
+	if (!is_process_copy(lkb)) {
+		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
+			  lkid, rc->rc_header.h_nodeid, remid, result);
+		dlm_dump_rsb(r);
+		unlock_rsb(r);
+		put_rsb(r);
+		dlm_put_lkb(lkb);
+		return -EINVAL;
+	}
+
 	switch (result) {
 	case -EBADR:
 		/* There's a chance the new master received our lock before