ocfs2_dlm: fix cluster-wide refcounting of lock resources

This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3..620eb82 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
 #include "cluster/masklog.h"
 
 static int dlm_thread(void *data);
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-				  struct dlm_lock_resource *lockres);
-
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@
 	current->state = TASK_RUNNING;
 }
 
-
-int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 {
 	if (list_empty(&res->granted) &&
 	    list_empty(&res->converting) &&
-	    list_empty(&res->blocked) &&
-	    list_empty(&res->dirty))
-		return 1;
+	    list_empty(&res->blocked))
+		return 0;
+	return 1;
+}
+
+/* "unused": the lockres has no locks, is not on the dirty list,
+ * has no inflight locks (in the gap between mastery and acquiring
+ * the first lock), and has no bits in its refmap.
+ * truly ready to be freed. */
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
+{
+	if (!__dlm_lockres_has_locks(res) &&
+	    list_empty(&res->dirty)) {
+		/* try not to scan the bitmap unless the first two
+		 * conditions are already true */
+		int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+		if (bit >= O2NM_MAX_NODES) {
+			/* since the bit for dlm->node_num is not
+			 * set, inflight_locks better be zero */
+			BUG_ON(res->inflight_locks != 0);
+			return 1;
+		}
+	}
 	return 0;
 }
 
@@ -106,46 +122,21 @@
 	assert_spin_locked(&res->spinlock);
 
 	if (__dlm_lockres_unused(res)){
-		/* For now, just keep any resource we master */
-		if (res->owner == dlm->node_num)
-		{
-			if (!list_empty(&res->purge)) {
-				mlog(0, "we master %s:%.*s, but it is on "
-				     "the purge list.  Removing\n",
-				     dlm->name, res->lockname.len,
-				     res->lockname.name);
-				list_del_init(&res->purge);
-				dlm->purge_count--;
-			}
-			return;
-		}
-
 		if (list_empty(&res->purge)) {
-			mlog(0, "putting lockres %.*s from purge list\n",
-			     res->lockname.len, res->lockname.name);
+			mlog(0, "putting lockres %.*s:%p onto purge list\n",
+			     res->lockname.len, res->lockname.name, res);
 
 			res->last_used = jiffies;
+			dlm_lockres_get(res);
 			list_add_tail(&res->purge, &dlm->purge_list);
 			dlm->purge_count++;
-
-			/* if this node is not the owner, there is
-			 * no way to keep track of who the owner could be.
-			 * unhash it to avoid serious problems. */
-			if (res->owner != dlm->node_num) {
-				mlog(0, "%s:%.*s: doing immediate "
-				     "purge of lockres owned by %u\n",
-				     dlm->name, res->lockname.len,
-				     res->lockname.name, res->owner);
-
-				dlm_purge_lockres_now(dlm, res);
-			}
 		}
 	} else if (!list_empty(&res->purge)) {
-		mlog(0, "removing lockres %.*s from purge list, "
-		     "owner=%u\n", res->lockname.len, res->lockname.name,
-		     res->owner);
+		mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
+		     res->lockname.len, res->lockname.name, res, res->owner);
 
 		list_del_init(&res->purge);
+		dlm_lockres_put(res);
 		dlm->purge_count--;
 	}
 }
@@ -163,68 +154,60 @@
 	spin_unlock(&dlm->spinlock);
 }
 
-/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
- * to do migration, but will re-acquire before exit. */
-void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
+int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 {
 	int master;
-	int ret;
+	int ret = 0;
 
-	spin_lock(&lockres->spinlock);
-	master = lockres->owner == dlm->node_num;
-	spin_unlock(&lockres->spinlock);
-
-	mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
-	     lockres->lockname.name, master);
-
-	/* Non master is the easy case -- no migration required, just
-	 * quit. */
+	spin_lock(&res->spinlock);
+	if (!__dlm_lockres_unused(res)) {
+		spin_unlock(&res->spinlock);
+		mlog(0, "%s:%.*s: tried to purge but not unused\n",
+		     dlm->name, res->lockname.len, res->lockname.name);
+		return -ENOTEMPTY;
+	}
+	master = (res->owner == dlm->node_num);
 	if (!master)
-		goto finish;
+		res->state |= DLM_LOCK_RES_DROPPING_REF;
+	spin_unlock(&res->spinlock);
 
-	/* Wheee! Migrate lockres here! */
-	spin_unlock(&dlm->spinlock);
-again:
+	mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
+	     res->lockname.name, master);
 
-	ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
-	if (ret == -ENOTEMPTY) {
-		mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
-		     lockres->lockname.len, lockres->lockname.name);
-
-		BUG();
-	} else if (ret < 0) {
-		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
-		     lockres->lockname.len, lockres->lockname.name);
-		msleep(100);
-		goto again;
+	if (!master) {
+		/* drop spinlock to do messaging, retake below */
+		spin_unlock(&dlm->spinlock);
+		/* clear our bit from the master's refmap, ignore errors */
+		ret = dlm_drop_lockres_ref(dlm, res);
+		if (ret < 0) {
+			mlog_errno(ret);
+			if (!dlm_is_host_down(ret))
+				BUG();
+		}
+		mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
+		     dlm->name, res->lockname.len, res->lockname.name, ret);
+		spin_lock(&dlm->spinlock);
 	}
 
-	spin_lock(&dlm->spinlock);
-
-finish:
-	if (!list_empty(&lockres->purge)) {
-		list_del_init(&lockres->purge);
+	if (!list_empty(&res->purge)) {
+		mlog(0, "removing lockres %.*s:%p from purgelist, "
+		     "master = %d\n", res->lockname.len, res->lockname.name,
+		     res, master);
+		list_del_init(&res->purge);
+		dlm_lockres_put(res);
 		dlm->purge_count--;
 	}
-	__dlm_unhash_lockres(lockres);
-}
+	__dlm_unhash_lockres(res);
 
-/* make an unused lockres go away immediately.
- * as soon as the dlm spinlock is dropped, this lockres
- * will not be found. kfree still happens on last put. */
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-				  struct dlm_lock_resource *lockres)
-{
-	assert_spin_locked(&dlm->spinlock);
-	assert_spin_locked(&lockres->spinlock);
-
-	BUG_ON(!__dlm_lockres_unused(lockres));
-
-	if (!list_empty(&lockres->purge)) {
-		list_del_init(&lockres->purge);
-		dlm->purge_count--;
+	/* lockres is not in the hash now.  drop the flag and wake up
+	 * any processes waiting in dlm_get_lock_resource. */
+	if (!master) {
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+		spin_unlock(&res->spinlock);
+		wake_up(&res->wq);
 	}
-	__dlm_unhash_lockres(lockres);
+	return 0;
 }
 
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +251,17 @@
 			break;
 		}
 
+		mlog(0, "removing lockres %.*s:%p from purgelist\n",
+		     lockres->lockname.len, lockres->lockname.name, lockres);
 		list_del_init(&lockres->purge);
+		dlm_lockres_put(lockres);
 		dlm->purge_count--;
 
 		/* This may drop and reacquire the dlm spinlock if it
 		 * has to do migration. */
 		mlog(0, "calling dlm_purge_lockres!\n");
-		dlm_purge_lockres(dlm, lockres);
+		if (dlm_purge_lockres(dlm, lockres))
+			BUG();
 		mlog(0, "DONE calling dlm_purge_lockres!\n");
 
 		/* Avoid adding any scheduling latencies */