ocfs2: Introduce the new ocfs2_cluster_connect/disconnect() API.

This step introduces a cluster stack agnostic API for initializing and
exiting.  fs/ocfs2/dlmglue.c no longer uses o2cb/o2dlm knowledge to
connect to the stack.  It is all handled in stackglue.c.

heartbeat.c no longer needs to know how it gets called.
ocfs2_do_node_down() is now a clean recovery trigger.

The big gotcha is the ordering of initializations and de-initializations done
underneath ocfs2_cluster_connect().  ocfs2_dlm_init() used to do all
o2dlm initialization in one block.  Thus, the o2dlm functionality of
ocfs2_cluster_connect() is very straightforward.  ocfs2_dlm_shutdown(),
however, did a few things between de-registration of the eviction
callback and actually shutting down the domain.  Now de-registration and
shutdown of the domain are wrapped within the single
ocfs2_cluster_disconnect() call.  I've checked the code paths to make
sure we can safely tear down things in ocfs2_dlm_shutdown() before
calling ocfs2_cluster_disconnect().  The filesystem has already set
itself to ignore the callback.

Signed-off-by: Joel Becker <joel.becker@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 4590376..6652a48 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -27,7 +27,6 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 #include <linux/mm.h>
-#include <linux/crc32.h>
 #include <linux/kthread.h>
 #include <linux/pagemap.h>
 #include <linux/debugfs.h>
@@ -259,31 +258,6 @@
 	.flags		= 0,
 };
 
-/*
- * This is the filesystem locking protocol version.
- *
- * Whenever the filesystem does new things with locks (adds or removes a
- * lock, orders them differently, does different things underneath a lock),
- * the version must be changed.  The protocol is negotiated when joining
- * the dlm domain.  A node may join the domain if its major version is
- * identical to all other nodes and its minor version is greater than
- * or equal to all other nodes.  When its minor version is greater than
- * the other nodes, it will run at the minor version specified by the
- * other nodes.
- *
- * If a locking change is made that will not be compatible with older
- * versions, the major number must be increased and the minor version set
- * to zero.  If a change merely adds a behavior that can be disabled when
- * speaking to older versions, the minor version must be increased.  If a
- * change adds a fully backwards compatible change (eg, LVB changes that
- * are just ignored by older versions), the version does not need to be
- * updated.
- */
-const struct dlm_protocol_version ocfs2_locking_protocol = {
-	.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
-	.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
-};
-
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
 {
 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
@@ -886,7 +860,7 @@
 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	ret = ocfs2_dlm_lock(osb->dlm,
+	ret = ocfs2_dlm_lock(osb->cconn,
 			     level,
 			     &lockres->l_lksb,
 			     dlm_flags,
@@ -1085,7 +1059,7 @@
 		     lockres->l_name, lockres->l_level, level);
 
 		/* call dlm_lock to upgrade lock now */
-		ret = ocfs2_dlm_lock(osb->dlm,
+		ret = ocfs2_dlm_lock(osb->cconn,
 				     level,
 				     &lockres->l_lksb,
 				     lkm_flags,
@@ -1492,7 +1466,7 @@
 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
+	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
 			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
 			     lockres);
 	if (ret) {
@@ -2485,8 +2459,7 @@
 int ocfs2_dlm_init(struct ocfs2_super *osb)
 {
 	int status = 0;
-	u32 dlm_key;
-	struct dlm_ctxt *dlm = NULL;
+	struct ocfs2_cluster_connection *conn = NULL;
 
 	mlog_entry_void();
 
@@ -2508,26 +2481,21 @@
 		goto bail;
 	}
 
-	/* used by the dlm code to make message headers unique, each
-	 * node in this domain must agree on this. */
-	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
-
 	/* for now, uuid == domain */
-	dlm = dlm_register_domain(osb->uuid_str, dlm_key,
-				  &osb->osb_locking_proto);
-	if (IS_ERR(dlm)) {
-		status = PTR_ERR(dlm);
+	status = ocfs2_cluster_connect(osb->uuid_str,
+				       strlen(osb->uuid_str),
+				       ocfs2_do_node_down, osb,
+				       &conn);
+	if (status) {
 		mlog_errno(status);
 		goto bail;
 	}
 
-	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
-
 local:
 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
 
-	osb->dlm = dlm;
+	osb->cconn = conn;
 
 	status = 0;
 bail:
@@ -2545,10 +2513,14 @@
 {
 	mlog_entry_void();
 
-	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
-
 	ocfs2_drop_osb_locks(osb);
 
+	/*
+	 * Now that we have dropped all locks and ocfs2_dismount_volume()
+	 * has disabled recovery, the DLM won't be talking to us.  It's
+	 * safe to tear things down before disconnecting the cluster.
+	 */
+
 	if (osb->dc_task) {
 		kthread_stop(osb->dc_task);
 		osb->dc_task = NULL;
@@ -2557,8 +2529,8 @@
 	ocfs2_lock_res_free(&osb->osb_super_lockres);
 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
 
-	dlm_unregister_domain(osb->dlm);
-	osb->dlm = NULL;
+	ocfs2_cluster_disconnect(osb->cconn);
+	osb->cconn = NULL;
 
 	ocfs2_dlm_shutdown_debug(osb);
 
@@ -2689,7 +2661,7 @@
 
 	mlog(0, "lock %s\n", lockres->l_name);
 
-	ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
 			       lockres);
 	if (ret) {
 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -2823,7 +2795,7 @@
 	if (lvb)
 		dlm_flags |= DLM_LKF_VALBLK;
 
-	ret = ocfs2_dlm_lock(osb->dlm,
+	ret = ocfs2_dlm_lock(osb->cconn,
 			     new_level,
 			     &lockres->l_lksb,
 			     dlm_flags,
@@ -2882,7 +2854,7 @@
 	mlog_entry_void();
 	mlog(0, "lock %s\n", lockres->l_name);
 
-	ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb,
+	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
 			       DLM_LKF_CANCEL, lockres);
 	if (ret) {
 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
@@ -3193,7 +3165,34 @@
 	return UNBLOCK_CONTINUE_POST;
 }
 
+/*
+ * This is the filesystem locking protocol.  It provides the lock handling
+ * hooks for the underlying DLM.  It has a maximum version number.
+ * The version number allows interoperability with systems running at
+ * the same major number and an equal or smaller minor number.
+ *
+ * Whenever the filesystem does new things with locks (adds or removes a
+ * lock, orders them differently, does different things underneath a lock),
+ * the version must be changed.  The protocol is negotiated when joining
+ * the dlm domain.  A node may join the domain if its major version is
+ * identical to all other nodes and its minor version is greater than
+ * or equal to all other nodes.  When its minor version is greater than
+ * the other nodes, it will run at the minor version specified by the
+ * other nodes.
+ *
+ * If a locking change is made that will not be compatible with older
+ * versions, the major number must be increased and the minor version set
+ * to zero.  If a change merely adds a behavior that can be disabled when
+ * speaking to older versions, the minor version must be increased.  If a
+ * change adds a fully backwards compatible change (eg, LVB changes that
+ * are just ignored by older versions), the version does not need to be
+ * updated.
+ */
 static struct ocfs2_locking_protocol lproto = {
+	.lp_max_version = {
+		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
+		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
+	},
 	.lp_lock_ast		= ocfs2_locking_ast,
 	.lp_blocking_ast	= ocfs2_blocking_ast,
 	.lp_unlock_ast		= ocfs2_unlock_ast,