ocfs2: Introduce the new ocfs2_cluster_connect/disconnect() API.

This step introduces a cluster stack agnostic API for initializing and
exiting.  fs/ocfs2/dlmglue.c no longer uses o2cb/o2dlm knowledge to
connect to the stack.  It is all handled in stackglue.c.

heartbeat.c no longer needs to know how it gets called.
ocfs2_do_node_down() is now a clean recovery trigger.

The big gotcha is the ordering of initializations and de-initializations done
underneath ocfs2_cluster_connect().  ocfs2_dlm_init() used to do all
o2dlm initialization in one block.  Thus, the o2dlm functionality of
ocfs2_cluster_connect() is very straightforward.  ocfs2_dlm_shutdown(),
however, did a few things between de-registration of the eviction
callback and actually shutting down the domain.  Now de-registration and
shutdown of the domain are wrapped within the single
ocfs2_cluster_disconnect() call.  I've checked the code paths to make
sure we can safely tear down things in ocfs2_dlm_shutdown() before
calling ocfs2_cluster_disconnect().  The filesystem has already set
itself to ignore the callback.

Signed-off-by: Joel Becker <joel.becker@oracle.com>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c
index eb88854..f6f309a 100644
--- a/fs/ocfs2/stackglue.c
+++ b/fs/ocfs2/stackglue.c
@@ -18,11 +18,21 @@
  * General Public License for more details.
  */
 
+#include <linux/slab.h>
+#include <linux/crc32.h>
+
+/* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */
+#include <linux/fs.h>
+
 #include "cluster/masklog.h"
 #include "stackglue.h"
 
 static struct ocfs2_locking_protocol *lproto;
 
+struct o2dlm_private {
+	struct dlm_eviction_cb op_eviction_cb;
+};
+
 /* These should be identical */
 #if (DLM_LOCK_IV != LKM_IVMODE)
 # error Lock modes do not match
@@ -197,7 +207,7 @@
 	lproto->lp_unlock_ast(astarg, error);
 }
 
-int ocfs2_dlm_lock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn,
 		   int mode,
 		   union ocfs2_dlm_lksb *lksb,
 		   u32 flags,
@@ -212,15 +222,15 @@
 
 	BUG_ON(lproto == NULL);
 
-	status = dlmlock(dlm, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags,
-			 name, namelen,
+	status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm,
+			 o2dlm_flags, name, namelen,
 			 o2dlm_lock_ast_wrapper, astarg,
 			 o2dlm_blocking_ast_wrapper);
 	ret = dlm_status_to_errno(status);
 	return ret;
 }
 
-int ocfs2_dlm_unlock(struct dlm_ctxt *dlm,
+int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn,
 		     union ocfs2_dlm_lksb *lksb,
 		     u32 flags,
 		     void *astarg)
@@ -231,8 +241,8 @@
 
 	BUG_ON(lproto == NULL);
 
-	status = dlmunlock(dlm, &lksb->lksb_o2dlm, o2dlm_flags,
-			   o2dlm_unlock_ast_wrapper, astarg);
+	status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm,
+			   o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg);
 	ret = dlm_status_to_errno(status);
 	return ret;
 }
@@ -252,6 +262,115 @@
 	return (void *)(lksb->lksb_o2dlm.lvb);
 }
 
+/*
+ * Called from the dlm when it's about to evict a node. This is how the
+ * classic stack signals node death.
+ */
+static void o2dlm_eviction_cb(int node_num, void *data)
+{
+	struct ocfs2_cluster_connection *conn = data;
+
+	mlog(ML_NOTICE, "o2dlm has evicted node %d from group %.*s\n",
+	     node_num, conn->cc_namelen, conn->cc_name);
+
+	conn->cc_recovery_handler(node_num, conn->cc_recovery_data);
+}
+
+int ocfs2_cluster_connect(const char *group,
+			  int grouplen,
+			  void (*recovery_handler)(int node_num,
+						   void *recovery_data),
+			  void *recovery_data,
+			  struct ocfs2_cluster_connection **conn)
+{
+	int rc = 0;
+	struct ocfs2_cluster_connection *new_conn;
+	u32 dlm_key;
+	struct dlm_ctxt *dlm;
+	struct o2dlm_private *priv;
+	struct dlm_protocol_version dlm_version;
+
+	BUG_ON(group == NULL);
+	BUG_ON(conn == NULL);
+	BUG_ON(recovery_handler == NULL);
+
+	if (grouplen > GROUP_NAME_MAX) {
+		rc = -EINVAL;
+		goto out;
+	}
+
+	new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection),
+			   GFP_KERNEL);
+	if (!new_conn) {
+		rc = -ENOMEM;
+		goto out;
+	}
+
+	memcpy(new_conn->cc_name, group, grouplen);
+	new_conn->cc_namelen = grouplen;
+	new_conn->cc_recovery_handler = recovery_handler;
+	new_conn->cc_recovery_data = recovery_data;
+
+	/* Start the new connection at our maximum compatibility level */
+	new_conn->cc_version = lproto->lp_max_version;
+
+	priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL);
+	if (!priv) {
+		rc = -ENOMEM;
+		goto out_free;
+	}
+
+	/* This just fills the structure in.  It is safe to use new_conn. */
+	dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb,
+			      new_conn);
+
+	new_conn->cc_private = priv;
+
+	/* used by the dlm code to make message headers unique, each
+	 * node in this domain must agree on this. */
+	dlm_key = crc32_le(0, group, grouplen);
+	dlm_version.pv_major = new_conn->cc_version.pv_major;
+	dlm_version.pv_minor = new_conn->cc_version.pv_minor;
+
+	dlm = dlm_register_domain(group, dlm_key, &dlm_version);
+	if (IS_ERR(dlm)) {
+		rc = PTR_ERR(dlm);
+		mlog_errno(rc);
+		goto out_free;
+	}
+
+	new_conn->cc_version.pv_major = dlm_version.pv_major;
+	new_conn->cc_version.pv_minor = dlm_version.pv_minor;
+	new_conn->cc_lockspace = dlm;
+
+	dlm_register_eviction_cb(dlm, &priv->op_eviction_cb);
+
+	*conn = new_conn;
+
+out_free:
+	if (rc) {
+		kfree(new_conn->cc_private);
+		kfree(new_conn);
+	}
+
+out:
+	return rc;
+}
+
+int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+	struct dlm_ctxt *dlm = conn->cc_lockspace;
+	struct o2dlm_private *priv = conn->cc_private;
+
+	dlm_unregister_eviction_cb(&priv->op_eviction_cb);
+	dlm_unregister_domain(dlm);
+
+	kfree(priv);
+	kfree(conn);
+
+	return 0;
+}
+
 void o2cb_get_stack(struct ocfs2_locking_protocol *proto)
 {
 	BUG_ON(proto == NULL);