ceph: cleanup ceph_flush_snaps()

This patch devide __ceph_flush_snaps() into two stags. In the first
stage, __ceph_flush_snaps() assign snapcaps flush TIDs and add them
to cap flush lists. __ceph_flush_snaps() keeps holding the
i_ceph_lock in this stagge. So inode's auth cap can not change. In
the second stage, __ceph_flush_snaps() send flushsnap cap messages.
i_ceph_lock is unlocked before sending each cap message. If auth cap
changes in the middle, __ceph_flush_snaps() just stops. This is OK
because kick_flushing_inode_caps() will re-send flushsnap cap messages
to inode's new auth MDS.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 39e471d..736e1c8 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1247,32 +1247,20 @@
  *
  * Called under i_ceph_lock.  Takes s_mutex as needed.
  */
-void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			struct ceph_mds_session **psession)
+static void __ceph_flush_snaps(struct ceph_inode_info *ci,
+			       struct ceph_mds_session *session)
 		__releases(ci->i_ceph_lock)
 		__acquires(ci->i_ceph_lock)
 {
 	struct inode *inode = &ci->vfs_inode;
-	int mds;
+	struct ceph_mds_client *mdsc = session->s_mdsc;
 	struct ceph_cap_snap *capsnap;
-	u32 mseq;
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
-	struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
-						    session->s_mutex */
-	u64 oldest_flush_tid;
-	u64 next_follows = 0;  /* keep track of how far we've gotten through the
-			     i_cap_snaps list, and skip these entries next time
-			     around to avoid an infinite loop */
+	u64 oldest_flush_tid = 0;
+	u64 first_tid = 1, last_tid = 0;
 
-	if (psession)
-		session = *psession;
+	dout("__flush_snaps %p session %p\n", inode, session);
 
-	dout("__flush_snaps %p\n", inode);
-retry:
 	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-		/* avoid an infiniute loop after retry */
-		if (capsnap->follows < next_follows)
-			continue;
 		/*
 		 * we need to wait for sync writes to complete and for dirty
 		 * pages to be written out.
@@ -1283,53 +1271,18 @@
 		/* should be removed by ceph_try_drop_cap_snap() */
 		BUG_ON(!capsnap->need_flush);
 
-		/* pick mds, take s_mutex */
-		if (ci->i_auth_cap == NULL) {
-			dout("no auth cap (migrating?), doing nothing\n");
-			goto out;
-		}
-
 		/* only flush each capsnap once */
 		if (capsnap->cap_flush.tid > 0) {
-			dout("already flushed %p, skipping\n", capsnap);
+			dout(" already flushed %p, skipping\n", capsnap);
 			continue;
 		}
 
-		mds = ci->i_auth_cap->session->s_mds;
-		mseq = ci->i_auth_cap->mseq;
-
-		if (session && session->s_mds != mds) {
-			dout("oops, wrong session %p mutex\n", session);
-
-			mutex_unlock(&session->s_mutex);
-			ceph_put_mds_session(session);
-			session = NULL;
-		}
-		if (!session) {
-			spin_unlock(&ci->i_ceph_lock);
-			mutex_lock(&mdsc->mutex);
-			session = __ceph_lookup_mds_session(mdsc, mds);
-			mutex_unlock(&mdsc->mutex);
-			if (session) {
-				dout("inverting session/ino locks on %p\n",
-				     session);
-				mutex_lock(&session->s_mutex);
-			}
-			/*
-			 * if session == NULL, we raced against a cap
-			 * deletion or migration.  retry, and we'll
-			 * get a better @mds value next time.
-			 */
-			spin_lock(&ci->i_ceph_lock);
-			goto retry;
-		}
-
 		spin_lock(&mdsc->cap_dirty_lock);
 		capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
 		list_add_tail(&capsnap->cap_flush.g_list,
 			      &mdsc->cap_flush_list);
-		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
-
+		if (oldest_flush_tid == 0)
+			oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 		if (list_empty(&ci->i_flushing_item)) {
 			list_add_tail(&ci->i_flushing_item,
 				      &session->s_cap_flushing);
@@ -1339,41 +1292,108 @@
 		list_add_tail(&capsnap->cap_flush.i_list,
 			      &ci->i_cap_flush_list);
 
+		if (first_tid == 1)
+			first_tid = capsnap->cap_flush.tid;
+		last_tid = capsnap->cap_flush.tid;
+	}
+
+	ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
+
+	while (first_tid <= last_tid) {
+		struct ceph_cap *cap = ci->i_auth_cap;
+		struct ceph_cap_flush *cf;
+		int ret;
+
+		if (!(cap && cap->session == session)) {
+			dout("__flush_snaps %p auth cap %p not mds%d, "
+			     "stop\n", inode, cap, session->s_mds);
+			break;
+		}
+
+		ret = -ENOENT;
+		list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
+			if (cf->tid >= first_tid) {
+				ret = 0;
+				break;
+			}
+		}
+		if (ret < 0)
+			break;
+
+		first_tid = cf->tid + 1;
+
+		capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
 		atomic_inc(&capsnap->nref);
 		spin_unlock(&ci->i_ceph_lock);
 
-		dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
-		     inode, capsnap, capsnap->follows, capsnap->cap_flush.tid);
-		__send_flush_snap(inode, session, capsnap, mseq,
-				  oldest_flush_tid);
+		dout("__flush_snaps %p capsnap %p tid %llu %s\n",
+		     inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
 
-		next_follows = capsnap->follows + 1;
+		ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+					oldest_flush_tid);
+		if (ret < 0) {
+			pr_err("__flush_snaps: error sending cap flushsnap, "
+			       "ino (%llx.%llx) tid %llu follows %llu\n",
+				ceph_vinop(inode), cf->tid, capsnap->follows);
+		}
+
 		ceph_put_cap_snap(capsnap);
-
 		spin_lock(&ci->i_ceph_lock);
+	}
+}
+
+void ceph_flush_snaps(struct ceph_inode_info *ci,
+		      struct ceph_mds_session **psession)
+{
+	struct inode *inode = &ci->vfs_inode;
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+	struct ceph_mds_session *session = *psession;
+	int mds;
+	dout("ceph_flush_snaps %p\n", inode);
+retry:
+	spin_lock(&ci->i_ceph_lock);
+	if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
+		dout(" no capsnap needs flush, doing nothing\n");
+		goto out;
+	}
+	if (!ci->i_auth_cap) {
+		dout(" no auth cap (migrating?), doing nothing\n");
+		goto out;
+	}
+
+	mds = ci->i_auth_cap->session->s_mds;
+	if (session && session->s_mds != mds) {
+		dout(" oops, wrong session %p mutex\n", session);
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+		session = NULL;
+	}
+	if (!session) {
+		spin_unlock(&ci->i_ceph_lock);
+		mutex_lock(&mdsc->mutex);
+		session = __ceph_lookup_mds_session(mdsc, mds);
+		mutex_unlock(&mdsc->mutex);
+		if (session) {
+			dout(" inverting session/ino locks on %p\n", session);
+			mutex_lock(&session->s_mutex);
+		}
 		goto retry;
 	}
 
+	__ceph_flush_snaps(ci, session);
+out:
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (psession) {
+		*psession = session;
+	} else {
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+	}
 	/* we flushed them all; remove this inode from the queue */
 	spin_lock(&mdsc->snap_flush_lock);
 	list_del_init(&ci->i_snap_flush_item);
 	spin_unlock(&mdsc->snap_flush_lock);
-
-out:
-	if (psession)
-		*psession = session;
-	else if (session) {
-		mutex_unlock(&session->s_mutex);
-		ceph_put_mds_session(session);
-	}
-}
-
-static void ceph_flush_snaps(struct ceph_inode_info *ci)
-{
-	spin_lock(&ci->i_ceph_lock);
-	__ceph_flush_snaps(ci, NULL);
-	ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
-	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -1768,10 +1788,9 @@
 						     oldest_flush_tid);
 				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 			}
-			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
-				__ceph_flush_snaps(ci, &session);
-				ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
-			}
+			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+				__ceph_flush_snaps(ci, session);
+
 			goto retry_locked;
 		}
 
@@ -2610,7 +2629,7 @@
 	if (last && !flushsnaps)
 		ceph_check_caps(ci, 0, NULL);
 	else if (flushsnaps)
-		ceph_flush_snaps(ci);
+		ceph_flush_snaps(ci, NULL);
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 	while (put-- > 0)
@@ -2691,7 +2710,7 @@
 	if (last) {
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 	} else if (flush_snaps) {
-		ceph_flush_snaps(ci);
+		ceph_flush_snaps(ci, NULL);
 	}
 	if (complete_capsnap)
 		wake_up_all(&ci->i_cap_wq);