ocfs2/dlm: fix race between dispatched_work and dlm_lockres_grab_inflight_worker

Commit ac4fef4d23ed ("ocfs2/dlm: do not purge lockres that is queued for
assert master") may have the following possible race case:

  dlm_dispatch_assert_master       dlm_wq
  ========================================================================
  queue_work(dlm->quedlm_worker,
      &dlm->dispatched_work);
                                 dispatch work,
                                 dlm_lockres_drop_inflight_worker
                                 *BUG_ON(res->inflight_assert_workers == 0)*
  dlm_lockres_grab_inflight_worker
  inflight_assert_workers++

So ensure inflight_assert_workers to be increased first.

Signed-off-by: Joseph Qi <joseph.qi@huawei.com>
Signed-off-by: Xue jiufei <xuejiufei@huawei.com>
Cc: Joel Becker <jlbec@evilplan.org>
Reviewed-by: Mark Fasheh <mfasheh@suse.de>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 3689b35..a6944b2 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -695,14 +695,6 @@
 			res->inflight_assert_workers);
 }
 
-static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
-		struct dlm_lock_resource *res)
-{
-	spin_lock(&res->spinlock);
-	__dlm_lockres_grab_inflight_worker(dlm, res);
-	spin_unlock(&res->spinlock);
-}
-
 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
 		struct dlm_lock_resource *res)
 {
@@ -1646,6 +1638,7 @@
 		}
 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
 			     dlm->node_num, res->lockname.len, res->lockname.name);
+		spin_lock(&res->spinlock);
 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
 		if (ret < 0) {
@@ -1653,7 +1646,8 @@
 			response = DLM_MASTER_RESP_ERROR;
 			dlm_lockres_put(res);
 		} else
-			dlm_lockres_grab_inflight_worker(dlm, res);
+			__dlm_lockres_grab_inflight_worker(dlm, res);
+		spin_unlock(&res->spinlock);
 	} else {
 		if (res)
 			dlm_lockres_put(res);