[DLM] block dlm_recv in recovery transition

Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 6657599..4b89e20 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -24,19 +24,28 @@
 
 
 /* If the start for which we're re-enabling locking (seq) has been superseded
-   by a newer stop (ls_recover_seq), we need to leave locking disabled. */
+   by a newer stop (ls_recover_seq), we need to leave locking disabled.
+
+   We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
+   locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
+   enables locking and clears the requestqueue between a and b. */
 
 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 {
 	int error = -EINTR;
 
+	down_write(&ls->ls_recv_active);
+
 	spin_lock(&ls->ls_recover_lock);
 	if (ls->ls_recover_seq == seq) {
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
+		/* unblocks processes waiting to enter the dlm */
 		up_write(&ls->ls_in_recovery);
 		error = 0;
 	}
 	spin_unlock(&ls->ls_recover_lock);
+
+	up_write(&ls->ls_recv_active);
 	return error;
 }