ceph: fix race of queuing delayed caps

When called with CHECK_CAPS_AUTHONLY flag, ceph_check_caps() only
processes auth caps. In that case, it's unsafe to remove inode
from mdsc->cap_delay_list, because there can be delayed non-auth
caps.

Besides, ceph_check_caps() may lock/unlock i_ceph_lock several
times, when multiple threads call ceph_check_caps() at the same
time. It's possible that one thread calls __cap_delay_requeue(),
another thread calls __cap_delay_cancel(). __cap_delay_cancel()
should be called at very beginning of ceph_check_caps(), so that
it does not race with __cap_delay_requeue().

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 36f8a4b..1726ddc 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -902,6 +902,11 @@
 /*
  * called under i_ceph_lock
  */
+static int __ceph_is_single_caps(struct ceph_inode_info *ci)
+{
+	return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
+}
+
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
 	return !RB_EMPTY_ROOT(&ci->i_caps);
@@ -1715,21 +1720,24 @@
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
 	struct rb_node *p;
-	int delayed = 0, sent = 0, num;
-	bool is_delayed = flags & CHECK_CAPS_NODELAY;
+	int delayed = 0, sent = 0;
+	bool no_delay = flags & CHECK_CAPS_NODELAY;
 	bool queue_invalidate = false;
-	bool force_requeue = false;
 	bool tried_invalidate = false;
 
 	/* if we are unmounting, flush any unused caps immediately. */
 	if (mdsc->stopping)
-		is_delayed = true;
+		no_delay = true;
 
 	spin_lock(&ci->i_ceph_lock);
 
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
 
+	if (!(flags & CHECK_CAPS_AUTHONLY) ||
+	    (ci->i_auth_cap && __ceph_is_single_caps(ci)))
+		__cap_delay_cancel(mdsc, ci);
+
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1784,7 +1792,7 @@
 	 * have cached pages, but don't want them, then try to invalidate.
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
-	if ((!is_delayed || mdsc->stopping) &&
+	if ((!no_delay || mdsc->stopping) &&
 	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
 	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
 	    inode->i_data.nrpages &&		/* have cached pages */
@@ -1801,10 +1809,8 @@
 		goto retry_locked;
 	}
 
-	num = 0;
 	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
 		cap = rb_entry(p, struct ceph_cap, ci_node);
-		num++;
 
 		/* avoid looping forever */
 		if (mds >= cap->mds ||
@@ -1867,7 +1873,7 @@
 		    cap->mds_wanted == want)
 			continue;     /* nope, all good */
 
-		if (is_delayed)
+		if (no_delay)
 			goto ack;
 
 		/* delay? */
@@ -1958,15 +1964,8 @@
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
-	/*
-	 * Reschedule delayed caps release if we delayed anything,
-	 * otherwise cancel.
-	 */
-	if (delayed && is_delayed)
-		force_requeue = true;   /* __send_cap delayed release; requeue */
-	if (!delayed && !is_delayed)
-		__cap_delay_cancel(mdsc, ci);
-	else if (!is_delayed || force_requeue)
+	/* Reschedule delayed caps release if we delayed anything */
+	if (delayed)
 		__cap_delay_requeue(mdsc, ci);
 
 	spin_unlock(&ci->i_ceph_lock);