[DLM] block dlm_recv in recovery transition

Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm.  This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.

The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle.  While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up.  dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero.  This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.

The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv.  The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa.  Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 188b91c..ae2fd97f 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -386,7 +386,10 @@
 	dlm_recover_process_copy(ls, rc_in);
 }
 
-static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+/* If the lockspace doesn't exist then still send a status message
+   back; it's possible that it just doesn't have its global_id yet. */
+
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 {
 	struct dlm_rcom *rc;
 	struct rcom_config *rf;
@@ -446,28 +449,11 @@
 	return rv;
 }
 
-/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
    recovery-only comms are sent through here. */
 
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
-	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
-	struct dlm_ls *ls;
-
-	dlm_rcom_in(rc);
-
-	/* If the lockspace doesn't exist then still send a status message
-	   back; it's possible that it just doesn't have its global_id yet. */
-
-	ls = dlm_find_lockspace_global(hd->h_lockspace);
-	if (!ls) {
-		log_print("lockspace %x from %d type %x not found",
-			  hd->h_lockspace, nodeid, rc->rc_type);
-		if (rc->rc_type == DLM_RCOM_STATUS)
-			send_ls_not_ready(nodeid, rc);
-		return;
-	}
-
 	if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
 		log_debug(ls, "ignoring recovery message %x from %d",
 			  rc->rc_type, nodeid);
@@ -477,12 +463,6 @@
 	if (is_old_reply(ls, rc))
 		goto out;
 
-	if (nodeid != rc->rc_header.h_nodeid) {
-		log_error(ls, "bad rcom nodeid %d from %d",
-			  rc->rc_header.h_nodeid, nodeid);
-		goto out;
-	}
-
 	switch (rc->rc_type) {
 	case DLM_RCOM_STATUS:
 		receive_rcom_status(ls, rc);
@@ -520,6 +500,6 @@
 		DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
 	}
  out:
-	dlm_put_lockspace(ls);
+	return;
 }