md-cluster: add the error check if failed to get dlm lock

In complicated cluster environment, it is possible that the
dlm lock couldn't be get/convert on purpose, the related err
info is added for better debug potential issue.

For lockres_free, if the lock is blocking by a lock request or
conversion request, then dlm_unlock just put it back to grant
queue, so need to ensure the lock is free finally.

Signed-off-by: Guoqing Jiang <gqjiang@suse.com>
Signed-off-by: NeilBrown <neilb@suse.com>
diff --git a/drivers/md/md-cluster.c b/drivers/md/md-cluster.c
index 2a57f19..b80a689 100644
--- a/drivers/md/md-cluster.c
+++ b/drivers/md/md-cluster.c
@@ -166,10 +166,24 @@
 
 static void lockres_free(struct dlm_lock_resource *res)
 {
+	int ret;
+
 	if (!res)
 		return;
 
-	dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
+	/* cancel a lock request or a conversion request that is blocked */
+	res->flags |= DLM_LKF_CANCEL;
+retry:
+	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
+	if (unlikely(ret != 0)) {
+		pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret);
+
+		/* if a lock conversion is cancelled, then the lock is put
+		 * back to grant queue, need to ensure it is unlocked */
+		if (ret == -DLM_ECANCEL)
+			goto retry;
+	}
+	res->flags &= ~DLM_LKF_CANCEL;
 	wait_for_completion(&res->completion);
 
 	kfree(res->name);
@@ -474,6 +488,7 @@
 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
 	struct cluster_msg msg;
+	int ret;
 
 	/*get CR on Message*/
 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
@@ -486,13 +501,21 @@
 	process_recvd_msg(thread->mddev, &msg);
 
 	/*release CR on ack_lockres*/
-	dlm_unlock_sync(ack_lockres);
+	ret = dlm_unlock_sync(ack_lockres);
+	if (unlikely(ret != 0))
+		pr_info("unlock ack failed return %d\n", ret);
 	/*up-convert to PR on message_lockres*/
-	dlm_lock_sync(message_lockres, DLM_LOCK_PR);
+	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
+	if (unlikely(ret != 0))
+		pr_info("lock PR on msg failed return %d\n", ret);
 	/*get CR on ack_lockres again*/
-	dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
+	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
+	if (unlikely(ret != 0))
+		pr_info("lock CR on ack failed return %d\n", ret);
 	/*release CR on message_lockres*/
-	dlm_unlock_sync(message_lockres);
+	ret = dlm_unlock_sync(message_lockres);
+	if (unlikely(ret != 0))
+		pr_info("unlock msg failed return %d\n", ret);
 }
 
 /* lock_comm()
@@ -567,7 +590,13 @@
 	}
 
 failed_ack:
-	dlm_unlock_sync(cinfo->message_lockres);
+	error = dlm_unlock_sync(cinfo->message_lockres);
+	if (unlikely(error != 0)) {
+		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
+			error);
+		/* in case the message can't be released due to some reason */
+		goto failed_ack;
+	}
 failed_message:
 	return error;
 }